assorted-commits Mailing List for Assorted projects (Page 20)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2009-05-24 21:05:15
|
Revision: 1427 http://assorted.svn.sourceforge.net/assorted/?rev=1427&view=rev Author: yangzhang Date: 2009-05-24 21:05:06 +0000 (Sun, 24 May 2009) Log Message: ----------- removed old version of the site Removed Paths: ------------- pitch-in/trunk/src/helloworld/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-24 20:51:33
|
Revision: 1426 http://assorted.svn.sourceforge.net/assorted/?rev=1426&view=rev Author: yangzhang Date: 2009-05-24 20:51:27 +0000 (Sun, 24 May 2009) Log Message: ----------- ported pitch-in to django Modified Paths: -------------- pitch-in/trunk/src/pitchin/pitchin/models.py pitch-in/trunk/src/pitchin/pitchin/settings.py pitch-in/trunk/src/pitchin/pitchin/urls.py pitch-in/trunk/src/pitchin/pitchin/views.py Added Paths: ----------- pitch-in/trunk/src/pitchin/pitchin/templates/ pitch-in/trunk/src/pitchin/pitchin/templates/created.html pitch-in/trunk/src/pitchin/pitchin/templates/index.html pitch-in/trunk/src/pitchin/pitchin/templates/main.html pitch-in/trunk/src/pitchin/pitchin/templates/pool.html Modified: pitch-in/trunk/src/pitchin/pitchin/models.py =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/models.py 2009-05-24 20:02:51 UTC (rev 1425) +++ pitch-in/trunk/src/pitchin/pitchin/models.py 2009-05-24 20:51:27 UTC (rev 1426) @@ -1,5 +1,5 @@ from google.appengine.ext import db -class Visitor(db.Model): - ip = db.StringProperty() - added_on = db.DateTimeProperty(auto_now_add=True) +class Pool(db.Model): + descrip = db.StringProperty() + contribs = db.BlobProperty() Modified: pitch-in/trunk/src/pitchin/pitchin/settings.py =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/settings.py 2009-05-24 20:02:51 UTC (rev 1425) +++ pitch-in/trunk/src/pitchin/pitchin/settings.py 2009-05-24 20:51:27 UTC (rev 1426) @@ -1,5 +1,7 @@ # Django settings for pitchin project. +import os + DEBUG = True TEMPLATE_DEBUG = DEBUG @@ -65,10 +67,13 @@ ROOT_URLCONF = 'pitchin.urls' +ROOT_PATH = os.path.dirname(__file__) + TEMPLATE_DIRS = ( # Put strings here, like "/home/html/django_templates" or "C:/www/django/templates". # Always use forward slashes, even on Windows. # Don't forget to use absolute paths, not relative paths. + ROOT_PATH + '/templates' ) INSTALLED_APPS = ( @@ -77,3 +82,5 @@ 'django.contrib.sessions', 'django.contrib.sites', ) + +APPEND_SLASH = False Added: pitch-in/trunk/src/pitchin/pitchin/templates/created.html =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/templates/created.html (rev 0) +++ pitch-in/trunk/src/pitchin/pitchin/templates/created.html 2009-05-24 20:51:27 UTC (rev 1426) @@ -0,0 +1,12 @@ +<html> + <body> + <p> + Congrats! You've successfully created a new money pool. + Send this URL out to anyone you're inviting to + participate: + </p> + <p> + <a href="/pool?key={{key}}">http://pitch-in.appspot.com/pool?key={{key}}</a> + </p> + </body> +</html> Added: pitch-in/trunk/src/pitchin/pitchin/templates/index.html =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/templates/index.html (rev 0) +++ pitch-in/trunk/src/pitchin/pitchin/templates/index.html 2009-05-24 20:51:27 UTC (rev 1426) @@ -0,0 +1,5 @@ +<html> +<body> +hi +</body> +</html> Added: pitch-in/trunk/src/pitchin/pitchin/templates/main.html =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/templates/main.html (rev 0) +++ pitch-in/trunk/src/pitchin/pitchin/templates/main.html 2009-05-24 20:51:27 UTC (rev 1426) @@ -0,0 +1,23 @@ +<html> + <body> + <p style="color: red">{{error}}</p> + <p> + Using PitchIn, you can easily create and manage a money pool that others + can contribute to. + </p> + <p> + To create a pool, just enter a description of what it's for here, and you + will be provided a URL that you can distribute to the people you want to + participate in the pool. + </p> + <form action="/create" method="post"> + <div><input type="text" name="descrip"/></div> + <div><input type="submit" width="50" value="Create Pool"></div> + </form> + <p>Example descriptions:</p> + <ul> + <li>Birthday gift for Amy (we're aiming for $180 Rock Band set)</li> + <li>Holiday vacation trip</li> + </ul> + </body> +</html> Added: pitch-in/trunk/src/pitchin/pitchin/templates/pool.html =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/templates/pool.html (rev 0) +++ pitch-in/trunk/src/pitchin/pitchin/templates/pool.html 2009-05-24 20:51:27 UTC (rev 1426) @@ -0,0 +1,23 @@ +<html> + <body> + <p style="color: red">{{error}}</p> + <p>Description of pool: {{descrip}}</p> + {{contribs}} + <p> + To add or update your own contribution to the pool, enter + the following information: + </p> + <form action="/pool" method="post"> + <input type="hidden" name="key" value="{{key}}"/> + <div> + <label name="name">Name:</label> + <input type="text" name="name"/> + </div> + <div> + <label name="amount">Amount:</label> + $<input type="text" name="amount"/>.00 + </div> + <div><input type="submit" value="Enter the pool!"/></div> + </form> + </body> +</html> Modified: pitch-in/trunk/src/pitchin/pitchin/urls.py =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/urls.py 2009-05-24 20:02:51 UTC (rev 1425) +++ pitch-in/trunk/src/pitchin/pitchin/urls.py 2009-05-24 20:51:27 UTC (rev 1426) @@ -5,8 +5,9 @@ # admin.autodiscover() urlpatterns = patterns('', - # Example: - (r'.*', 'pitchin.views.main'), + (r'^$', 'pitchin.views.main'), + (r'^create$', 'pitchin.views.create'), + (r'^pool$', 'pitchin.views.pool'), # Uncomment the admin/doc line below and add 'django.contrib.admindocs' # to INSTALLED_APPS to enable admin documentation: Modified: pitch-in/trunk/src/pitchin/pitchin/views.py =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/views.py 2009-05-24 20:02:51 UTC (rev 1425) +++ pitch-in/trunk/src/pitchin/pitchin/views.py 2009-05-24 20:51:27 UTC (rev 1426) @@ -1,9 +1,64 @@ -from django.http import HttpResponse -from pitchin.models import Visitor +from django.http import HttpResponseRedirect +from django.shortcuts import render_to_response +from google.appengine.ext import db +from pitchin.models import Pool +import cPickle, cgi +def getPool(request): return db.get(db.Key(request.REQUEST['key'])) + +def getContribs(pool): return cPickle.loads(str(pool.contribs)) +def serContribs(contribs): return cPickle.dumps(contribs, 2) +def showContribs(pool): + contribs = getContribs(pool) + if len(contribs) == 0: + return '<p>(no participants yet)</p>' + else: + s = '<ul>' + for name, amount in contribs.iteritems(): + s += '<li>%s: $%d.00</li>' % (cgi.escape(name), amount) + s += '<li>TOTAL: $%d.00</li></ul>' % (sum(contribs.itervalues()),) + return s + +class invalid_submit(Exception): pass +def validate(pred, msg): + if not pred: raise invalid_submit(msg) + +### Views + def main(request): - visitor = Visitor() - visitor.ip = request.META["REMOTE_ADDR"] - visitor.put() - return HttpResponse( u'%s visited on %s' % (v.ip, v.added_on) - for v in Visitors.all().order('-added_on').fetch(limit=40) ) + return render_to_response('main.html') + +def create(request): + if request.REQUEST['descrip'].strip() == '': + return render_to_response('main.html', {'error': 'Must specify a description'}) + pool = Pool(descrip = request.REQUEST['descrip'], contribs = cPickle.dumps({})) + pool.put() + return render_to_response('created.html', {'key': pool.key()}) + +def pool(request): + if request.method == 'GET': return view(request) + else: return update(request) + +def view(request, error = ''): + pool = getPool(request) + return render_to_response('pool.html', + dict(error = error, + descrip = cgi.escape(pool.descrip), + contribs = showContribs(pool), + key = request.REQUEST['key'])) + +def update(request): + try: + validate(request.REQUEST['name'].strip() != '', + 'You must enter a name.') + try: amount = int(request.REQUEST['amount']) + except ValueError: raise invalid_submit('You must enter an integer dollar amount.') + except invalid_submit, ex: + return view(request, error = ex) + else: + pool = getPool(request) + contribs = getContribs(pool) + contribs[request.REQUEST['name']] = amount + pool.contribs = cPickle.dumps(contribs) + pool.put() + return HttpResponseRedirect('/pool?key=%s' % pool.key()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-24 20:02:56
|
Revision: 1425 http://assorted.svn.sourceforge.net/assorted/?rev=1425&view=rev Author: yangzhang Date: 2009-05-24 20:02:51 +0000 (Sun, 24 May 2009) Log Message: ----------- added simple django app Added Paths: ----------- pitch-in/trunk/src/pitchin/ pitch-in/trunk/src/pitchin/app.yaml pitch-in/trunk/src/pitchin/main.py pitch-in/trunk/src/pitchin/pitchin/ pitch-in/trunk/src/pitchin/pitchin/__init__.py pitch-in/trunk/src/pitchin/pitchin/manage.py pitch-in/trunk/src/pitchin/pitchin/models.py pitch-in/trunk/src/pitchin/pitchin/settings.py pitch-in/trunk/src/pitchin/pitchin/urls.py pitch-in/trunk/src/pitchin/pitchin/views.py Added: pitch-in/trunk/src/pitchin/app.yaml =================================================================== --- pitch-in/trunk/src/pitchin/app.yaml (rev 0) +++ pitch-in/trunk/src/pitchin/app.yaml 2009-05-24 20:02:51 UTC (rev 1425) @@ -0,0 +1,8 @@ +application: pitch-in +version: 1 +runtime: python +api_version: 1 + +handlers: +- url: /.* + script: main.py Added: pitch-in/trunk/src/pitchin/main.py =================================================================== --- pitch-in/trunk/src/pitchin/main.py (rev 0) +++ pitch-in/trunk/src/pitchin/main.py 2009-05-24 20:02:51 UTC (rev 1425) @@ -0,0 +1,39 @@ +import logging, os + +# Google App Engine imports. +from google.appengine.ext.webapp import util + +# Must set this env var before importing any part of Django +os.environ['DJANGO_SETTINGS_MODULE'] = 'pitchin.settings' + +# Force Django to reload its settings. +from django.conf import settings +settings._target = None + +import logging +import django.core.handlers.wsgi +import django.core.signals +import django.db +import django.dispatch.dispatcher + +def log_exception(*args, **kwds): + logging.exception('Exception in request:') + +# Log errors. +django.dispatch.dispatcher.connect( + log_exception, django.core.signals.got_request_exception) + +# Unregister the rollback event handler. +django.dispatch.dispatcher.disconnect( + django.db._rollback_on_exception, + django.core.signals.got_request_exception) + +def main(): + # Create a Django application for WSGI. + application = django.core.handlers.wsgi.WSGIHandler() + + # Run the WSGI CGI handler with that application. + util.run_wsgi_app(application) + +if __name__ == '__main__': + main() Added: pitch-in/trunk/src/pitchin/pitchin/manage.py =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/manage.py (rev 0) +++ pitch-in/trunk/src/pitchin/pitchin/manage.py 2009-05-24 20:02:51 UTC (rev 1425) @@ -0,0 +1,11 @@ +#!/usr/bin/python +from django.core.management import execute_manager +try: + import settings # Assumed to be in the same directory. +except ImportError: + import sys + sys.stderr.write("Error: Can't find the file 'settings.py' in the directory containing %r. It appears you've customized things.\nYou'll have to run django-admin.py, passing it your settings module.\n(If the file settings.py does indeed exist, it's causing an ImportError somehow.)\n" % __file__) + sys.exit(1) + +if __name__ == "__main__": + execute_manager(settings) Property changes on: pitch-in/trunk/src/pitchin/pitchin/manage.py ___________________________________________________________________ Added: svn:executable + * Added: pitch-in/trunk/src/pitchin/pitchin/models.py =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/models.py (rev 0) +++ pitch-in/trunk/src/pitchin/pitchin/models.py 2009-05-24 20:02:51 UTC (rev 1425) @@ -0,0 +1,5 @@ +from google.appengine.ext import db + +class Visitor(db.Model): + ip = db.StringProperty() + added_on = db.DateTimeProperty(auto_now_add=True) Added: pitch-in/trunk/src/pitchin/pitchin/settings.py =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/settings.py (rev 0) +++ pitch-in/trunk/src/pitchin/pitchin/settings.py 2009-05-24 20:02:51 UTC (rev 1425) @@ -0,0 +1,79 @@ +# Django settings for pitchin project. + +DEBUG = True +TEMPLATE_DEBUG = DEBUG + +ADMINS = ( + # ('Your Name', 'you...@do...'), +) + +MANAGERS = ADMINS + +DATABASE_ENGINE = '' # 'postgresql_psycopg2', 'postgresql', 'mysql', 'sqlite3' or 'oracle'. +DATABASE_NAME = '' # Or path to database file if using sqlite3. +DATABASE_USER = '' # Not used with sqlite3. +DATABASE_PASSWORD = '' # Not used with sqlite3. +DATABASE_HOST = '' # Set to empty string for localhost. Not used with sqlite3. +DATABASE_PORT = '' # Set to empty string for default. Not used with sqlite3. + +# Local time zone for this installation. Choices can be found here: +# http://en.wikipedia.org/wiki/List_of_tz_zones_by_name +# although not all choices may be available on all operating systems. +# If running in a Windows environment this must be set to the same as your +# system time zone. +TIME_ZONE = 'America/Chicago' + +# Language code for this installation. All choices can be found here: +# http://www.i18nguy.com/unicode/language-identifiers.html +LANGUAGE_CODE = 'en-us' + +SITE_ID = 1 + +# If you set this to False, Django will make some optimizations so as not +# to load the internationalization machinery. +USE_I18N = True + +# Absolute path to the directory that holds media. +# Example: "/home/media/media.lawrence.com/" +MEDIA_ROOT = '' + +# URL that handles the media served from MEDIA_ROOT. Make sure to use a +# trailing slash if there is a path component (optional in other cases). +# Examples: "http://media.lawrence.com", "http://example.com/media/" +MEDIA_URL = '' + +# URL prefix for admin media -- CSS, JavaScript and images. Make sure to use a +# trailing slash. +# Examples: "http://foo.com/media/", "/media/". +ADMIN_MEDIA_PREFIX = '/media/' + +# Make this unique, and don't share it with anybody. +SECRET_KEY = 'j+1_^qvj*i3a@wnrl*ravo1r^&80+i$*sn9j3&*o%iw5y&fl7s' + +# List of callables that know how to import templates from various sources. +TEMPLATE_LOADERS = ( + 'django.template.loaders.filesystem.load_template_source', + 'django.template.loaders.app_directories.load_template_source', +# 'django.template.loaders.eggs.load_template_source', +) + +MIDDLEWARE_CLASSES = ( + 'django.middleware.common.CommonMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', +) + +ROOT_URLCONF = 'pitchin.urls' + +TEMPLATE_DIRS = ( + # Put strings here, like "/home/html/django_templates" or "C:/www/django/templates". + # Always use forward slashes, even on Windows. + # Don't forget to use absolute paths, not relative paths. +) + +INSTALLED_APPS = ( + 'django.contrib.auth', + 'django.contrib.contenttypes', + 'django.contrib.sessions', + 'django.contrib.sites', +) Added: pitch-in/trunk/src/pitchin/pitchin/urls.py =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/urls.py (rev 0) +++ pitch-in/trunk/src/pitchin/pitchin/urls.py 2009-05-24 20:02:51 UTC (rev 1425) @@ -0,0 +1,17 @@ +from django.conf.urls.defaults import * + +# Uncomment the next two lines to enable the admin: +# from django.contrib import admin +# admin.autodiscover() + +urlpatterns = patterns('', + # Example: + (r'.*', 'pitchin.views.main'), + + # Uncomment the admin/doc line below and add 'django.contrib.admindocs' + # to INSTALLED_APPS to enable admin documentation: + # (r'^admin/doc/', include('django.contrib.admindocs.urls')), + + # Uncomment the next line to enable the admin: + # (r'^admin/(.*)', admin.site.root), +) Added: pitch-in/trunk/src/pitchin/pitchin/views.py =================================================================== --- pitch-in/trunk/src/pitchin/pitchin/views.py (rev 0) +++ pitch-in/trunk/src/pitchin/pitchin/views.py 2009-05-24 20:02:51 UTC (rev 1425) @@ -0,0 +1,9 @@ +from django.http import HttpResponse +from pitchin.models import Visitor + +def main(request): + visitor = Visitor() + visitor.ip = request.META["REMOTE_ADDR"] + visitor.put() + return HttpResponse( u'%s visited on %s' % (v.ip, v.added_on) + for v in Visitors.all().order('-added_on').fetch(limit=40) ) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-19 00:09:41
|
Revision: 1424 http://assorted.svn.sourceforge.net/assorted/?rev=1424&view=rev Author: yangzhang Date: 2009-05-19 00:09:28 +0000 (Tue, 19 May 2009) Log Message: ----------- added pitch-in Added Paths: ----------- pitch-in/ pitch-in/trunk/ pitch-in/trunk/src/ pitch-in/trunk/src/helloworld/ pitch-in/trunk/src/helloworld/app.yaml pitch-in/trunk/src/helloworld/pitchin.py Added: pitch-in/trunk/src/helloworld/app.yaml =================================================================== --- pitch-in/trunk/src/helloworld/app.yaml (rev 0) +++ pitch-in/trunk/src/helloworld/app.yaml 2009-05-19 00:09:28 UTC (rev 1424) @@ -0,0 +1,8 @@ +application: pitch-in +version: 1 +runtime: python +api_version: 1 + +handlers: +- url: /.* + script: pitchin.py Added: pitch-in/trunk/src/helloworld/pitchin.py =================================================================== --- pitch-in/trunk/src/helloworld/pitchin.py (rev 0) +++ pitch-in/trunk/src/helloworld/pitchin.py 2009-05-19 00:09:28 UTC (rev 1424) @@ -0,0 +1,138 @@ +import cgi + +# TODO: add anonymization feature + +from google.appengine.api import users +from google.appengine.ext import webapp +from google.appengine.ext.webapp.util import run_wsgi_app +from google.appengine.ext import db +import cPickle, logging + +def getContribs(pool): return cPickle.loads(str(pool.contribs)) +def serContribs(contribs): return cPickle.dumps(contribs, 2) +def showContribs(pool): + contribs = getContribs(pool) + if len(contribs) == 0: + return '<p>(no participants yet)</p>' + else: + s = '<ul>' + for name, amount in contribs.iteritems(): + s += '<li>%s: $%d.00</li>' % (cgi.escape(name), amount) + s += '<li>TOTAL: $%d.00</li></ul>' % (sum(contribs.itervalues()),) + return s + +class invalid_submit(Exception): pass +def validate(pred, msg): + if not pred: raise invalid_submit(msg) + +class Pool(db.Model): + descrip = db.StringProperty() + contribs = db.BlobProperty() + +def mainPage(page, error = ''): + page.response.out.write(r""" + <html> + <body> + <p style="color: red">%(error)s</p> + <p> + To create a pool, just enter a description of what it's + for here, and you will be provided a URL that you can + distribute to the people you want to participate in the + pool. + </p> + <form action="/create" method="post"> + <div><input type="text" name="descrip"/></div> + <div><input type="submit" width="50" value="Create Pool"></div> + </form> + <p>Examples:</p> + <ul> + <li>Birthday gift for Amy (aiming for $180 Rock Band set)</li> + <li>Holiday vacation trip</li> + </ul> + </body> + </html>""" % {'error': error}) + +class MainPage(webapp.RequestHandler): + def get(self): + mainPage(self) + +class CreatePage(webapp.RequestHandler): + def post(self): + if self.request.get('descrip').strip() == '': + mainPage(self, 'Must specify a description') + else: + pool = Pool() + pool.descrip = self.request.get('descrip') + pool.contribs = cPickle.dumps({}) + pool.put() + self.response.out.write(r""" + <html> + <body> + <p> + Congrats! You've successfully created a new money pool. + Send this URL out to anyone you're inviting to + participate: + </p> + <p> + <a + href="/pool?key=%(key)s">http://pitch-in.appspot.com/pool?key=%(key)s</a> + </p> + </body> + </html>""" % {'key': pool.key()}) + +class PoolPage(webapp.RequestHandler): + def getPool(self): + return db.get(db.Key(self.request.get('key'))) + def get(self, error = ''): + pool = self.getPool() + self.response.out.write(r""" + <html> + <body> + <p style="color: red">%(error)s</p> + <p>Description of pool: %(descrip)s</p> + %(contribs)s + <p> + To add or update your own contribution to the pool, enter + the following information: + </p> + <form action="/pool" method="post"> + <input type="hidden" name="key" value="%(key)s"/> + <div> + <label name="name">Name:</label> + <input type="text" name="name"/> + </div> + <div> + <label name="amount">Amount:</label> + $<input type="text" name="amount"/>.00 + </div> + <div><input type="submit" value="Enter the pool!"/></div> + </form> + </body> + </html> + """ % {'error': error, + 'descrip': cgi.escape(pool.descrip), + 'contribs': showContribs(pool), + 'key': cgi.escape(self.request.get('key'))}) + def post(self): + try: + validate(self.request.get('name').strip() != '', + 'You must enter a name.') + try: amount = int(self.request.get('amount')) + except ValueError: raise invalid_submit('You must enter an integer dollar amount.') + except invalid_submit, ex: + self.get(ex) + else: + pool = self.getPool() + contribs = getContribs(pool) + contribs[self.request.get('name')] = amount + pool.contribs = cPickle.dumps(contribs) + pool.put() + self.redirect('/pool?key=%s' % pool.key()) + +application = webapp.WSGIApplication([('/', MainPage), + ('/create', CreatePage), + ('/pool', PoolPage)], + debug = True) + +def main(): run_wsgi_app(application) +if __name__ == "__main__": main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-18 17:00:27
|
Revision: 1423 http://assorted.svn.sourceforge.net/assorted/?rev=1423&view=rev Author: yangzhang Date: 2009-05-18 17:00:15 +0000 (Mon, 18 May 2009) Log Message: ----------- added comment Modified Paths: -------------- sandbox/trunk/src/py/sqlitetest.py Modified: sandbox/trunk/src/py/sqlitetest.py =================================================================== --- sandbox/trunk/src/py/sqlitetest.py 2009-05-18 16:59:10 UTC (rev 1422) +++ sandbox/trunk/src/py/sqlitetest.py 2009-05-18 17:00:15 UTC (rev 1423) @@ -9,13 +9,14 @@ conn.execute('INSERT INTO shelf (key, value) VALUES (?,?)', (i, i)) conn.commit() -# This will loop forever. if 0: + # This will loop forever. for i, in conn.cursor().execute('SELECT key FROM shelf ORDER BY ROWID'): conn.execute('REPLACE INTO shelf (key, value) VALUES (?,?)', (i, i)) conn.commit() print i else: + # This will timeout trying to acquire a lock on the DB from conn. conn2 = sqlite3.connect('/tmp/db') conn2.text_factory = bytes for i, in conn2.execute('SELECT key FROM shelf ORDER BY ROWID'): This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-18 16:59:23
|
Revision: 1422 http://assorted.svn.sourceforge.net/assorted/?rev=1422&view=rev Author: yangzhang Date: 2009-05-18 16:59:10 +0000 (Mon, 18 May 2009) Log Message: ----------- added simple demo of isolation levels in sqlite Added Paths: ----------- sandbox/trunk/src/py/sqlitetest.py Added: sandbox/trunk/src/py/sqlitetest.py =================================================================== --- sandbox/trunk/src/py/sqlitetest.py (rev 0) +++ sandbox/trunk/src/py/sqlitetest.py 2009-05-18 16:59:10 UTC (rev 1422) @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +import sqlite3 + +conn = sqlite3.connect('/tmp/db') +conn.text_factory = bytes +conn.execute('CREATE TABLE shelf (key INTEGER NOT NULL, value INTEGER NOT NULL)') +for i in xrange(3): + conn.execute('INSERT INTO shelf (key, value) VALUES (?,?)', (i, i)) +conn.commit() + +# This will loop forever. +if 0: + for i, in conn.cursor().execute('SELECT key FROM shelf ORDER BY ROWID'): + conn.execute('REPLACE INTO shelf (key, value) VALUES (?,?)', (i, i)) + conn.commit() + print i +else: + conn2 = sqlite3.connect('/tmp/db') + conn2.text_factory = bytes + for i, in conn2.execute('SELECT key FROM shelf ORDER BY ROWID'): + conn.execute('REPLACE INTO shelf (key, value) VALUES (?,?)', (i, i)) + conn.commit() + print i Property changes on: sandbox/trunk/src/py/sqlitetest.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-15 16:48:09
|
Revision: 1421 http://assorted.svn.sourceforge.net/assorted/?rev=1421&view=rev Author: yangzhang Date: 2009-05-15 16:48:03 +0000 (Fri, 15 May 2009) Log Message: ----------- - added jorendorff's path module; - added customm shelf for sqlhash Modified Paths: -------------- python-commons/trunk/src/commons/sqlhash.py Added Paths: ----------- python-commons/trunk/src/commons/path.py Added: python-commons/trunk/src/commons/path.py =================================================================== --- python-commons/trunk/src/commons/path.py (rev 0) +++ python-commons/trunk/src/commons/path.py 2009-05-15 16:48:03 UTC (rev 1421) @@ -0,0 +1,971 @@ +""" path.py - An object representing a path to a file or directory. + +Example: + +from path import path +d = path('/home/guido/bin') +for f in d.files('*.py'): + f.chmod(0755) + +This module requires Python 2.6 or later. + + +URL: http://www.jorendorff.com/articles/python/path +Author: Jason Orendorff <jason.orendorff\x40gmail\x2ecom> (and others - see the url!) + Yang Zhang <yaaang\x40gmail\x2ecom> +Date: 2009 +""" + + +# TODO +# - Tree-walking functions don't avoid symlink loops. Matt Harrison +# sent me a patch for this. +# - Bug in write_text(). It doesn't support Universal newline mode. +# - Better error message in listdir() when self isn't a +# directory. (On Windows, the error message really sucks.) +# - Make sure everything has a good docstring. +# - Add methods for regex find and replace. +# - guess_content_type() method? +# - Perhaps support arguments to touch(). + +from __future__ import generators + +import sys, warnings, os, fnmatch, glob, shutil, codecs, hashlib + +__version__ = '2.6' +__all__ = ['path'] + +# Platform-specific support for path.owner +if os.name == 'nt': + try: + import win32security + except ImportError: + win32security = None +else: + try: + import pwd + except ImportError: + pwd = None + +# Pre-2.3 support. Are unicode filenames supported? +_base = str +_getcwd = os.getcwd +try: + if os.path.supports_unicode_filenames: + _base = unicode + _getcwd = os.getcwdu +except AttributeError: + pass + +# Pre-2.3 workaround for booleans +try: + True, False +except NameError: + True, False = 1, 0 + +# Pre-2.3 workaround for basestring. +try: + basestring +except NameError: + basestring = (str, unicode) + +# Universal newline support +_textmode = 'r' +if hasattr(file, 'newlines'): + _textmode = 'U' + + +class TreeWalkWarning(Warning): + pass + +class path(_base): + """ Represents a filesystem path. + + For documentation on individual methods, consult their + counterparts in os.path. + """ + + # --- Special Python methods. + + def __repr__(self): + return 'path(%s)' % _base.__repr__(self) + + # Adding a path and a string yields a path. + def __add__(self, more): + try: + resultStr = _base.__add__(self, more) + except TypeError: #Python bug + resultStr = NotImplemented + if resultStr is NotImplemented: + return resultStr + return self.__class__(resultStr) + + def __radd__(self, other): + if isinstance(other, basestring): + return self.__class__(other.__add__(self)) + else: + return NotImplemented + + # The / operator joins paths. + def __div__(self, rel): + """ fp.__div__(rel) == fp / rel == fp.joinpath(rel) + + Join two path components, adding a separator character if + needed. + """ + return self.__class__(os.path.join(self, rel)) + + # Make the / operator work even when true division is enabled. + __truediv__ = __div__ + + def getcwd(cls): + """ Return the current working directory as a path object. """ + return cls(_getcwd()) + getcwd = classmethod(getcwd) + + + # --- Operations on path strings. + + isabs = os.path.isabs + def abspath(self): return self.__class__(os.path.abspath(self)) + def normcase(self): return self.__class__(os.path.normcase(self)) + def normpath(self): return self.__class__(os.path.normpath(self)) + def realpath(self): return self.__class__(os.path.realpath(self)) + def expanduser(self): return self.__class__(os.path.expanduser(self)) + def expandvars(self): return self.__class__(os.path.expandvars(self)) + def dirname(self): return self.__class__(os.path.dirname(self)) + basename = os.path.basename + + def expand(self): + """ Clean up a filename by calling expandvars(), + expanduser(), and normpath() on it. + + This is commonly everything needed to clean up a filename + read from a configuration file, for example. + """ + return self.expandvars().expanduser().normpath() + + def _get_namebase(self): + base, ext = os.path.splitext(self.name) + return base + + def _get_ext(self): + f, ext = os.path.splitext(_base(self)) + return ext + + def _get_drive(self): + drive, r = os.path.splitdrive(self) + return self.__class__(drive) + + parent = property( + dirname, None, None, + """ This path's parent directory, as a new path object. + + For example, path('/usr/local/lib/libpython.so').parent == path('/usr/local/lib') + """) + + name = property( + basename, None, None, + """ The name of this file or directory without the full path. + + For example, path('/usr/local/lib/libpython.so').name == 'libpython.so' + """) + + namebase = property( + _get_namebase, None, None, + """ The same as path.name, but with one file extension stripped off. + + For example, path('/home/guido/python.tar.gz').name == 'python.tar.gz', + but path('/home/guido/python.tar.gz').namebase == 'python.tar' + """) + + ext = property( + _get_ext, None, None, + """ The file extension, for example '.py'. """) + + drive = property( + _get_drive, None, None, + """ The drive specifier, for example 'C:'. + This is always empty on systems that don't use drive specifiers. + """) + + def splitpath(self): + """ p.splitpath() -> Return (p.parent, p.name). """ + parent, child = os.path.split(self) + return self.__class__(parent), child + + def splitdrive(self): + """ p.splitdrive() -> Return (p.drive, <the rest of p>). + + Split the drive specifier from this path. If there is + no drive specifier, p.drive is empty, so the return value + is simply (path(''), p). This is always the case on Unix. + """ + drive, rel = os.path.splitdrive(self) + return self.__class__(drive), rel + + def splitext(self): + """ p.splitext() -> Return (p.stripext(), p.ext). + + Split the filename extension from this path and return + the two parts. Either part may be empty. + + The extension is everything from '.' to the end of the + last path segment. This has the property that if + (a, b) == p.splitext(), then a + b == p. + """ + filename, ext = os.path.splitext(self) + return self.__class__(filename), ext + + def stripext(self): + """ p.stripext() -> Remove one file extension from the path. + + For example, path('/home/guido/python.tar.gz').stripext() + returns path('/home/guido/python.tar'). + """ + return self.splitext()[0] + + if hasattr(os.path, 'splitunc'): + def splitunc(self): + unc, rest = os.path.splitunc(self) + return self.__class__(unc), rest + + def _get_uncshare(self): + unc, r = os.path.splitunc(self) + return self.__class__(unc) + + uncshare = property( + _get_uncshare, None, None, + """ The UNC mount point for this path. + This is empty for paths on local drives. """) + + def joinpath(self, *args): + """ Join two or more path components, adding a separator + character (os.sep) if needed. Returns a new path + object. + """ + return self.__class__(os.path.join(self, *args)) + + def splitall(self): + r""" Return a list of the path components in this path. + + The first item in the list will be a path. Its value will be + either os.curdir, os.pardir, empty, or the root directory of + this path (for example, '/' or 'C:\\'). The other items in + the list will be strings. + + path.path.joinpath(*result) will yield the original path. + """ + parts = [] + loc = self + while loc != os.curdir and loc != os.pardir: + prev = loc + loc, child = prev.splitpath() + if loc == prev: + break + parts.append(child) + parts.append(loc) + parts.reverse() + return parts + + def relpath(self): + """ Return this path as a relative path, + based from the current working directory. + """ + cwd = self.__class__(os.getcwd()) + return cwd.relpathto(self) + + def relpathto(self, dest): + """ Return a relative path from self to dest. + + If there is no relative path from self to dest, for example if + they reside on different drives in Windows, then this returns + dest.abspath(). + """ + origin = self.abspath() + dest = self.__class__(dest).abspath() + + orig_list = origin.normcase().splitall() + # Don't normcase dest! We want to preserve the case. + dest_list = dest.splitall() + + if orig_list[0] != os.path.normcase(dest_list[0]): + # Can't get here from there. + return dest + + # Find the location where the two paths start to differ. + i = 0 + for start_seg, dest_seg in zip(orig_list, dest_list): + if start_seg != os.path.normcase(dest_seg): + break + i += 1 + + # Now i is the point where the two paths diverge. + # Need a certain number of "os.pardir"s to work up + # from the origin to the point of divergence. + segments = [os.pardir] * (len(orig_list) - i) + # Need to add the diverging part of dest_list. + segments += dest_list[i:] + if len(segments) == 0: + # If they happen to be identical, use os.curdir. + relpath = os.curdir + else: + relpath = os.path.join(*segments) + return self.__class__(relpath) + + # --- Listing, searching, walking, and matching + + def listdir(self, pattern=None): + """ D.listdir() -> List of items in this directory. + + Use D.files() or D.dirs() instead if you want a listing + of just files or just subdirectories. + + The elements of the list are path objects. + + With the optional 'pattern' argument, this only lists + items whose names match the given pattern. + """ + names = os.listdir(self) + if pattern is not None: + names = fnmatch.filter(names, pattern) + return [self / child for child in names] + + def dirs(self, pattern=None): + """ D.dirs() -> List of this directory's subdirectories. + + The elements of the list are path objects. + This does not walk recursively into subdirectories + (but see path.walkdirs). + + With the optional 'pattern' argument, this only lists + directories whose names match the given pattern. For + example, d.dirs('build-*'). + """ + return [p for p in self.listdir(pattern) if p.isdir()] + + def files(self, pattern=None): + """ D.files() -> List of the files in this directory. + + The elements of the list are path objects. + This does not walk into subdirectories (see path.walkfiles). + + With the optional 'pattern' argument, this only lists files + whose names match the given pattern. For example, + d.files('*.pyc'). + """ + + return [p for p in self.listdir(pattern) if p.isfile()] + + def walk(self, pattern=None, errors='strict'): + """ D.walk() -> iterator over files and subdirs, recursively. + + The iterator yields path objects naming each child item of + this directory and its descendants. This requires that + D.isdir(). + + This performs a depth-first traversal of the directory tree. + Each directory is returned just before all its children. + + The errors= keyword argument controls behavior when an + error occurs. The default is 'strict', which causes an + exception. The other allowed values are 'warn', which + reports the error via warnings.warn(), and 'ignore'. + """ + if errors not in ('strict', 'warn', 'ignore'): + raise ValueError("invalid errors parameter") + + try: + childList = self.listdir() + except Exception: + if errors == 'ignore': + return + elif errors == 'warn': + warnings.warn( + "Unable to list directory '%s': %s" + % (self, sys.exc_info()[1]), + TreeWalkWarning) + return + else: + raise + + for child in childList: + if pattern is None or child.fnmatch(pattern): + yield child + try: + isdir = child.isdir() + except Exception: + if errors == 'ignore': + isdir = False + elif errors == 'warn': + warnings.warn( + "Unable to access '%s': %s" + % (child, sys.exc_info()[1]), + TreeWalkWarning) + isdir = False + else: + raise + + if isdir: + for item in child.walk(pattern, errors): + yield item + + def walkdirs(self, pattern=None, errors='strict'): + """ D.walkdirs() -> iterator over subdirs, recursively. + + With the optional 'pattern' argument, this yields only + directories whose names match the given pattern. For + example, mydir.walkdirs('*test') yields only directories + with names ending in 'test'. + + The errors= keyword argument controls behavior when an + error occurs. The default is 'strict', which causes an + exception. The other allowed values are 'warn', which + reports the error via warnings.warn(), and 'ignore'. + """ + if errors not in ('strict', 'warn', 'ignore'): + raise ValueError("invalid errors parameter") + + try: + dirs = self.dirs() + except Exception: + if errors == 'ignore': + return + elif errors == 'warn': + warnings.warn( + "Unable to list directory '%s': %s" + % (self, sys.exc_info()[1]), + TreeWalkWarning) + return + else: + raise + + for child in dirs: + if pattern is None or child.fnmatch(pattern): + yield child + for subsubdir in child.walkdirs(pattern, errors): + yield subsubdir + + def walkfiles(self, pattern=None, errors='strict'): + """ D.walkfiles() -> iterator over files in D, recursively. + + The optional argument, pattern, limits the results to files + with names that match the pattern. For example, + mydir.walkfiles('*.tmp') yields only files with the .tmp + extension. + """ + if errors not in ('strict', 'warn', 'ignore'): + raise ValueError("invalid errors parameter") + + try: + childList = self.listdir() + except Exception: + if errors == 'ignore': + return + elif errors == 'warn': + warnings.warn( + "Unable to list directory '%s': %s" + % (self, sys.exc_info()[1]), + TreeWalkWarning) + return + else: + raise + + for child in childList: + try: + isfile = child.isfile() + isdir = not isfile and child.isdir() + except: + if errors == 'ignore': + continue + elif errors == 'warn': + warnings.warn( + "Unable to access '%s': %s" + % (self, sys.exc_info()[1]), + TreeWalkWarning) + continue + else: + raise + + if isfile: + if pattern is None or child.fnmatch(pattern): + yield child + elif isdir: + for f in child.walkfiles(pattern, errors): + yield f + + def fnmatch(self, pattern): + """ Return True if self.name matches the given pattern. + + pattern - A filename pattern with wildcards, + for example '*.py'. + """ + return fnmatch.fnmatch(self.name, pattern) + + def glob(self, pattern): + """ Return a list of path objects that match the pattern. + + pattern - a path relative to this directory, with wildcards. + + For example, path('/users').glob('*/bin/*') returns a list + of all the files users have in their bin directories. + """ + cls = self.__class__ + return [cls(s) for s in glob.glob(_base(self / pattern))] + + + # --- Reading or writing an entire file at once. + + def open(self, mode='r'): + """ Open this file. Return a file object. """ + return file(self, mode) + + def bytes(self): + """ Open this file, read all bytes, return them as a string. """ + f = self.open('rb') + try: + return f.read() + finally: + f.close() + + def write_bytes(self, bytes, append=False): + """ Open this file and write the given bytes to it. + + Default behavior is to overwrite any existing file. + Call p.write_bytes(bytes, append=True) to append instead. + """ + if append: + mode = 'ab' + else: + mode = 'wb' + f = self.open(mode) + try: + f.write(bytes) + finally: + f.close() + + def text(self, encoding=None, errors='strict'): + r""" Open this file, read it in, return the content as a string. + + This uses 'U' mode in Python 2.3 and later, so '\r\n' and '\r' + are automatically translated to '\n'. + + Optional arguments: + + encoding - The Unicode encoding (or character set) of + the file. If present, the content of the file is + decoded and returned as a unicode object; otherwise + it is returned as an 8-bit str. + errors - How to handle Unicode errors; see help(str.decode) + for the options. Default is 'strict'. + """ + if encoding is None: + # 8-bit + f = self.open(_textmode) + try: + return f.read() + finally: + f.close() + else: + # Unicode + f = codecs.open(self, 'r', encoding, errors) + # (Note - Can't use 'U' mode here, since codecs.open + # doesn't support 'U' mode, even in Python 2.3.) + try: + t = f.read() + finally: + f.close() + return (t.replace(u'\r\n', u'\n') + .replace(u'\r\x85', u'\n') + .replace(u'\r', u'\n') + .replace(u'\x85', u'\n') + .replace(u'\u2028', u'\n')) + + def write_text(self, text, encoding=None, errors='strict', linesep=os.linesep, append=False): + r""" Write the given text to this file. + + The default behavior is to overwrite any existing file; + to append instead, use the 'append=True' keyword argument. + + There are two differences between path.write_text() and + path.write_bytes(): newline handling and Unicode handling. + See below. + + Parameters: + + - text - str/unicode - The text to be written. + + - encoding - str - The Unicode encoding that will be used. + This is ignored if 'text' isn't a Unicode string. + + - errors - str - How to handle Unicode encoding errors. + Default is 'strict'. See help(unicode.encode) for the + options. This is ignored if 'text' isn't a Unicode + string. + + - linesep - keyword argument - str/unicode - The sequence of + characters to be used to mark end-of-line. The default is + os.linesep. You can also specify None; this means to + leave all newlines as they are in 'text'. + + - append - keyword argument - bool - Specifies what to do if + the file already exists (True: append to the end of it; + False: overwrite it.) The default is False. + + + --- Newline handling. + + write_text() converts all standard end-of-line sequences + ('\n', '\r', and '\r\n') to your platform's default end-of-line + sequence (see os.linesep; on Windows, for example, the + end-of-line marker is '\r\n'). + + If you don't like your platform's default, you can override it + using the 'linesep=' keyword argument. If you specifically want + write_text() to preserve the newlines as-is, use 'linesep=None'. + + This applies to Unicode text the same as to 8-bit text, except + there are three additional standard Unicode end-of-line sequences: + u'\x85', u'\r\x85', and u'\u2028'. + + (This is slightly different from when you open a file for + writing with fopen(filename, "w") in C or file(filename, 'w') + in Python.) + + + --- Unicode + + If 'text' isn't Unicode, then apart from newline handling, the + bytes are written verbatim to the file. The 'encoding' and + 'errors' arguments are not used and must be omitted. + + If 'text' is Unicode, it is first converted to bytes using the + specified 'encoding' (or the default encoding if 'encoding' + isn't specified). The 'errors' argument applies only to this + conversion. + + """ + if isinstance(text, unicode): + if linesep is not None: + # Convert all standard end-of-line sequences to + # ordinary newline characters. + text = (text.replace(u'\r\n', u'\n') + .replace(u'\r\x85', u'\n') + .replace(u'\r', u'\n') + .replace(u'\x85', u'\n') + .replace(u'\u2028', u'\n')) + text = text.replace(u'\n', linesep) + if encoding is None: + encoding = sys.getdefaultencoding() + bytes = text.encode(encoding, errors) + else: + # It is an error to specify an encoding if 'text' is + # an 8-bit string. + assert encoding is None + + if linesep is not None: + text = (text.replace('\r\n', '\n') + .replace('\r', '\n')) + bytes = text.replace('\n', linesep) + + self.write_bytes(bytes, append) + + def lines(self, encoding=None, errors='strict', retain=True): + r""" Open this file, read all lines, return them in a list. + + Optional arguments: + encoding - The Unicode encoding (or character set) of + the file. The default is None, meaning the content + of the file is read as 8-bit characters and returned + as a list of (non-Unicode) str objects. + errors - How to handle Unicode errors; see help(str.decode) + for the options. Default is 'strict' + retain - If true, retain newline characters; but all newline + character combinations ('\r', '\n', '\r\n') are + translated to '\n'. If false, newline characters are + stripped off. Default is True. + + This uses 'U' mode in Python 2.3 and later. + """ + if encoding is None and retain: + f = self.open(_textmode) + try: + return f.readlines() + finally: + f.close() + else: + return self.text(encoding, errors).splitlines(retain) + + def write_lines(self, lines, encoding=None, errors='strict', + linesep=os.linesep, append=False): + r""" Write the given lines of text to this file. + + By default this overwrites any existing file at this path. + + This puts a platform-specific newline sequence on every line. + See 'linesep' below. + + lines - A list of strings. + + encoding - A Unicode encoding to use. This applies only if + 'lines' contains any Unicode strings. + + errors - How to handle errors in Unicode encoding. This + also applies only to Unicode strings. + + linesep - The desired line-ending. This line-ending is + applied to every line. If a line already has any + standard line ending ('\r', '\n', '\r\n', u'\x85', + u'\r\x85', u'\u2028'), that will be stripped off and + this will be used instead. The default is os.linesep, + which is platform-dependent ('\r\n' on Windows, '\n' on + Unix, etc.) Specify None to write the lines as-is, + like file.writelines(). + + Use the keyword argument append=True to append lines to the + file. The default is to overwrite the file. Warning: + When you use this with Unicode data, if the encoding of the + existing data in the file is different from the encoding + you specify with the encoding= parameter, the result is + mixed-encoding data, which can really confuse someone trying + to read the file later. + """ + if append: + mode = 'ab' + else: + mode = 'wb' + f = self.open(mode) + try: + for line in lines: + isUnicode = isinstance(line, unicode) + if linesep is not None: + # Strip off any existing line-end and add the + # specified linesep string. + if isUnicode: + if line[-2:] in (u'\r\n', u'\x0d\x85'): + line = line[:-2] + elif line[-1:] in (u'\r', u'\n', + u'\x85', u'\u2028'): + line = line[:-1] + else: + if line[-2:] == '\r\n': + line = line[:-2] + elif line[-1:] in ('\r', '\n'): + line = line[:-1] + line += linesep + if isUnicode: + if encoding is None: + encoding = sys.getdefaultencoding() + line = line.encode(encoding, errors) + f.write(line) + finally: + f.close() + + def read_md5(self): + """ Calculate the md5 hash for this file. + + This reads through the entire file. + """ + f = self.open('rb') + try: + m = hashlib.md5() + while True: + d = f.read(8192) + if not d: + break + m.update(d) + finally: + f.close() + return m.digest() + + # --- Methods for querying the filesystem. + + exists = os.path.exists + isdir = os.path.isdir + isfile = os.path.isfile + islink = os.path.islink + ismount = os.path.ismount + + if hasattr(os.path, 'samefile'): + samefile = os.path.samefile + + getatime = os.path.getatime + atime = property( + getatime, None, None, + """ Last access time of the file. """) + + getmtime = os.path.getmtime + mtime = property( + getmtime, None, None, + """ Last-modified time of the file. """) + + if hasattr(os.path, 'getctime'): + getctime = os.path.getctime + ctime = property( + getctime, None, None, + """ Creation time of the file. """) + + getsize = os.path.getsize + size = property( + getsize, None, None, + """ Size of the file, in bytes. """) + + if hasattr(os, 'access'): + def access(self, mode): + """ Return true if current user has access to this path. + + mode - One of the constants os.F_OK, os.R_OK, os.W_OK, os.X_OK + """ + return os.access(self, mode) + + def stat(self): + """ Perform a stat() system call on this path. """ + return os.stat(self) + + def lstat(self): + """ Like path.stat(), but do not follow symbolic links. """ + return os.lstat(self) + + def get_owner(self): + r""" Return the name of the owner of this file or directory. + + This follows symbolic links. + + On Windows, this returns a name of the form ur'DOMAIN\User Name'. + On Windows, a group can own a file or directory. + """ + if os.name == 'nt': + if win32security is None: + raise Exception("path.owner requires win32all to be installed") + desc = win32security.GetFileSecurity( + self, win32security.OWNER_SECURITY_INFORMATION) + sid = desc.GetSecurityDescriptorOwner() + account, domain, typecode = win32security.LookupAccountSid(None, sid) + return domain + u'\\' + account + else: + if pwd is None: + raise NotImplementedError("path.owner is not implemented on this platform.") + st = self.stat() + return pwd.getpwuid(st.st_uid).pw_name + + owner = property( + get_owner, None, None, + """ Name of the owner of this file or directory. """) + + if hasattr(os, 'statvfs'): + def statvfs(self): + """ Perform a statvfs() system call on this path. """ + return os.statvfs(self) + + if hasattr(os, 'pathconf'): + def pathconf(self, name): + return os.pathconf(self, name) + + + # --- Modifying operations on files and directories + + def utime(self, times): + """ Set the access and modified times of this file. """ + os.utime(self, times) + + def chmod(self, mode): + os.chmod(self, mode) + + if hasattr(os, 'chown'): + def chown(self, uid, gid): + os.chown(self, uid, gid) + + def rename(self, new): + os.rename(self, new) + + def renames(self, new): + os.renames(self, new) + + + # --- Create/delete operations on directories + + def mkdir(self, mode=0777): + os.mkdir(self, mode) + + def makedirs(self, mode=0777): + os.makedirs(self, mode) + + def rmdir(self): + os.rmdir(self) + + def removedirs(self): + os.removedirs(self) + + + # --- Modifying operations on files + + def touch(self): + """ Set the access/modified times of this file to the current time. + Create the file if it does not exist. + """ + fd = os.open(self, os.O_WRONLY | os.O_CREAT, 0666) + os.close(fd) + os.utime(self, None) + + def remove(self): + os.remove(self) + + def unlink(self): + os.unlink(self) + + + # --- Links + + if hasattr(os, 'link'): + def link(self, newpath): + """ Create a hard link at 'newpath', pointing to this file. """ + os.link(self, newpath) + + if hasattr(os, 'symlink'): + def symlink(self, newlink): + """ Create a symbolic link at 'newlink', pointing here. """ + os.symlink(self, newlink) + + if hasattr(os, 'readlink'): + def readlink(self): + """ Return the path to which this symbolic link points. + + The result may be an absolute or a relative path. + """ + return self.__class__(os.readlink(self)) + + def readlinkabs(self): + """ Return the path to which this symbolic link points. + + The result is always an absolute path. + """ + p = self.readlink() + if p.isabs(): + return p + else: + return (self.parent / p).abspath() + + + # --- High-level functions from shutil + + copyfile = shutil.copyfile + copymode = shutil.copymode + copystat = shutil.copystat + copy = shutil.copy + copy2 = shutil.copy2 + copytree = shutil.copytree + if hasattr(shutil, 'move'): + move = shutil.move + rmtree = shutil.rmtree + + + # --- Special stuff from os + + if hasattr(os, 'chroot'): + def chroot(self): + os.chroot(self) + + if hasattr(os, 'startfile'): + def startfile(self): + os.startfile(self) + Modified: python-commons/trunk/src/commons/sqlhash.py =================================================================== --- python-commons/trunk/src/commons/sqlhash.py 2009-05-15 16:47:28 UTC (rev 1420) +++ python-commons/trunk/src/commons/sqlhash.py 2009-05-15 16:48:03 UTC (rev 1421) @@ -1,5 +1,5 @@ -#!/usr/bin/env python3.0 -# From <http://code.activestate.com/recipes/576638/> +# Based on <http://code.activestate.com/recipes/576638/>. + ''' Dbm based on sqlite -- Needed to support shelves Key and values are always stored as bytes. This means that when strings are @@ -16,9 +16,9 @@ __all__ = ['error', 'open'] -import sqlite3 -import collections -from operator import itemgetter +import sqlite3, itertools, collections, sys, shelve, cPickle, threading +if sys.version_info < (3,0): + from itertools import imap as map error = sqlite3.DatabaseError @@ -31,7 +31,7 @@ # w -- open existing # r -- readonly - MAKE_SHELF = 'CREATE TABLE IF NOT EXISTS shelf (key TEXT NOT NULL, value TEXT NOT NULL)' + MAKE_SHELF = 'CREATE TABLE IF NOT EXISTS shelf (key BLOB NOT NULL, value BLOB NOT NULL)' MAKE_INDEX = 'CREATE UNIQUE INDEX IF NOT EXISTS keyndx ON shelf (key)' self.conn = sqlite3.connect(filename) self.conn.text_factory = bytes @@ -57,30 +57,31 @@ def __contains__(self, key): GET_ITEM = 'SELECT value FROM shelf WHERE key = ?' - return self.conn.execute(GET_ITEM, (key,)).fetchone() is not None + return self.conn.execute(GET_ITEM, (sqlite3.Binary(key),)).fetchone() is not None def __getitem__(self, key): GET_ITEM = 'SELECT value FROM shelf WHERE key = ?' - item = self.conn.execute(GET_ITEM, (key,)).fetchone() + item = self.conn.execute(GET_ITEM, (sqlite3.Binary(key),)).fetchone() if item is None: raise KeyError(key) return item[0] def __setitem__(self, key, value): ADD_ITEM = 'REPLACE INTO shelf (key, value) VALUES (?,?)' - self.conn.execute(ADD_ITEM, (key, value)) + self.conn.execute(ADD_ITEM, (sqlite3.Binary(key), sqlite3.Binary(value))) self.conn.commit() def __delitem__(self, key): if key not in self: raise KeyError(key) DEL_ITEM = 'DELETE FROM shelf WHERE key = ?' - self.conn.execute(DEL_ITEM, (key,)) + self.conn.execute(DEL_ITEM, (sqlite3.Binary(key),)) self.conn.commit() def update(self, items=(), **kwds): if isinstance(items, collections.Mapping): items = items.items() + items = ((sqlite3.Binary(k),sqlite3.Binary(v)) for k,v in items) UPDATE_ITEMS = 'REPLACE INTO shelf (key, value) VALUES (?, ?)' self.conn.executemany(UPDATE_ITEMS, items) self.conn.commit() @@ -110,13 +111,13 @@ def __iter__(self): GET_KEYS = 'SELECT key FROM shelf ORDER BY ROWID' - return map(itemgetter(0), self._mapping.conn.cursor().execute(GET_KEYS)) + return (str(row[0]) for row in self._mapping.conn.cursor().execute(GET_KEYS)) class SQLhashValuesView(collections.ValuesView, ListRepr): def __iter__(self): GET_VALUES = 'SELECT value FROM shelf ORDER BY ROWID' - return map(itemgetter(0), self._mapping.conn.cursor().execute(GET_VALUES)) + return (str(row[0]) for row in self._mapping.conn.cursor().execute(GET_VALUES)) class SQLhashItemsView(collections.ValuesView, ListRepr): @@ -129,7 +130,54 @@ return SQLhash(file) return SQLhash() +class Shelf(shelve.Shelf): + def __setitem__(self, key, value): + if self.writeback: self.cache[key] = value + else: self.dict[key] = cPickle.dumps(value, self._protocol) + def __delitem__(self, key): + try: + del self.dict[key] + except: + del self.cache[key] + else: + try: del self.cache[key] + except KeyError: pass + def sync(self): + if self.cache: + self.dict.update( (k, cPickle.dumps(v, self._protocol)) for k,v in self.cache.items() ) + self.cache.clear() +# Attempt to make a *safe* background-writeback Shelf is hard. + +#class Shelf(shelve.Shelf): +# def __init__(self, *args, **kwargs): +# shelve.Shelf.__init__(self, *args, **kwargs) +# threading.Thread(target = self.syncer_proc).start() +# self.syncer_queue = Queue.Queue() +# def syncer_proc(self): +# while True: +# cache = self.syncer_queue.get() +# if cache is None: break +# self.dict.update( (k, cPickle.dumps(v, self._protocol)) for k,v in self.cache.items() ) +# def __setitem__(self, key, value): +# if self.writeback: self.cache[key] = value +# else: self.dict[key] = cPickle.dumps(value, self._protocol) +# def __delitem__(self, key): +# try: +# del self.dict[key] +# except: +# del self.cache[key] +# else: +# try: del self.cache[key] +# except KeyError: pass +# def sync(self): +# if self.cache: +# self.syncer_queue.push(self.cache) +# self.cache = {} +# with self.syncer_lock: +# while self.syncer_busy: self.syncer_free.wait() +# self.dict.update( (k, cPickle.dumps(v, self._protocol)) for k,v in self.cache.items() ) + if __name__ in '__main___': for d in SQLhash(), SQLhash('example'): print(list(d), "start") This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-15 16:47:42
|
Revision: 1420 http://assorted.svn.sourceforge.net/assorted/?rev=1420&view=rev Author: yangzhang Date: 2009-05-15 16:47:28 +0000 (Fri, 15 May 2009) Log Message: ----------- first major update in a long time - switched to new sqlhash storage system - added --no-fetch, --refresh Modified Paths: -------------- mailing-list-filter/trunk/src/mlf.py Modified: mailing-list-filter/trunk/src/mlf.py =================================================================== --- mailing-list-filter/trunk/src/mlf.py 2009-05-15 07:01:14 UTC (rev 1419) +++ mailing-list-filter/trunk/src/mlf.py 2009-05-15 16:47:28 UTC (rev 1420) @@ -1,5 +1,7 @@ #!/usr/bin/env python +# See RFC 3501. + """ Given a Gmail IMAP mailbox, star all messages in which you were a participant (either a sender or an explicit recipient in To: or Cc:), where thread grouping @@ -7,43 +9,36 @@ """ from __future__ import with_statement -from collections import defaultdict -from email import message_from_string -from getpass import getpass -from imaplib import IMAP4_SSL -from argparse import ArgumentParser from path import path -from re import match from functools import partial -from itertools import count -from commons.decs import pickle_memoized -from commons.files import cleanse_filename, soft_makedirs from commons.log import * -from commons.misc import default_if_none, seq -from commons.networking import logout -from commons.seqs import concat, grouper -from commons.startup import run_main from contextlib import closing -import logging -from commons import log +import getpass, logging, shelve, email, re, os, imaplib, itertools, argparse, collections +from commons import log, startup, seqs, networking, files, sqlhash info = partial(log.info, 'main') debug = partial(log.debug, 'main') warning = partial(log.warning, 'main') error = partial(log.error, 'main') die = partial(log.die, 'main') +exception = partial(log.exception, 'main') -def thread_dfs(msg, tid, tid2msgs): +def opendb(dbpath): + return sqlhash.Shelf(sqlhash.SQLhash(dbpath, flags = 'w'), + protocol = 2, writeback = True) + +def thread_dfs(msg, tid, mid2msg, tid2msgs): assert msg.tid is None msg.tid = tid tid2msgs[tid].append(msg) - for ref in msg.refs: - if ref.tid is None: - thread_dfs(ref, tid, tid2msgs) + for refmid in msg.refs: + refmsg = mid2msg[refmid] + if refmsg.tid is None: + thread_dfs(refmsg, tid, mid2msg, tid2msgs) else: - assert ref.tid == tid + assert refmsg.tid == tid -def getmail(imap): +def getmaxuid(imap): info( 'finding max UID' ) # We use UIDs rather than the default of sequence numbers because UIDs are # guaranteed to be persistent across sessions. This means that we can, for @@ -52,8 +47,10 @@ ok, [uids] = imap.uid('SEARCH', None, 'ALL') maxuid = int( uids.split()[-1] ) del uids + return maxuid - info( 'actually fetching the messages in chunks up to max', maxuid ) +def getmail(imap, minuid, maxuid): + info( 'fetching messages', minuid, 'to', maxuid ) # The syntax/fields of the FETCH command is documented in RFC 2060. Also, # this article contains a brief overview: # http://www.devshed.com/c/a/Python/Python-Email-Libraries-part-2-IMAP/3/ @@ -61,19 +58,32 @@ query = '(FLAGS BODY.PEEK[HEADER.FIELDS ' \ '(Message-ID References In-Reply-To From To Cc Subject)])' step = 1000 - return list( concat( - seq( lambda: info('fetching', start, 'to', start + step - 1), - lambda: imap.uid('FETCH', '%d:%d' % (start, start + step - 1), - query)[1] ) - for start in xrange(1, maxuid + 1, step) ) ) + for start in xrange(minuid, maxuid + 1, step): + range = '%d:%d' % (start, min(maxuid, start + step - 1)) + while True: + try: + info('fetching', range) + ok, chunk = imap.uid('FETCH', range, query) + except imaplib.abort, ex: + error('fetch failed:', ex.message) + if 'System Error' not in ex.message: raise + except: + exception('fetch failed') + raise + else: + break + for row in chunk: + yield row def main(argv): - p = ArgumentParser(description = __doc__) + p = argparse.ArgumentParser(description = __doc__) p.add_argument('--credfile', default = path( '~/.mlf.auth' ).expanduser(), help = """File containing your login credentials, with the username on the first line and the password on the second line. Ignored iff --prompt.""") p.add_argument('--cachedir', default = path( '~/.mlf.cache' ).expanduser(), help = "Directory to use for caching our data.") + p.add_argument('--refresh', action = 'store_true', + help = "Re-fetch all messages, wiping out existing cache.") p.add_argument('--prompt', action = 'store_true', help = "Interactively prompt for the username and password.") p.add_argument('--pretend', action = 'store_true', @@ -83,6 +93,8 @@ help = "Do not mark newly revelant threads as unread.") p.add_argument('--no-mark-seen', action = 'store_true', help = "Do not mark newly irrevelant threads as read.") + p.add_argument('--no-fetch', action = 'store_true', + help = "Do not fetch new messages; just process already-fetched messages.") p.add_argument('--debug', action = 'append', default = [], help = """Enable logging for messages of the given flags. Flags include: refs (references to missing Message-IDs), dups (duplicate Message-IDs), @@ -101,136 +113,147 @@ print "username:", cfg.user = raw_input() print "password:", - cfg.passwd = getpass() + cfg.passwd = getpass.getpass() else: with file(cfg.credfile) as f: [cfg.user, cfg.passwd] = map(lambda x: x.strip('\r\n'), f.readlines()) try: - m = match( r'(?P<host>[^:/]+)(:(?P<port>\d+))?(/(?P<mailbox>.+))?$', + m = re.match( r'(?P<host>[^:/]+)(:(?P<port>\d+))?(/(?P<mailbox>.+))?$', cfg.server ) cfg.host = m.group('host') - cfg.port = int( default_if_none(m.group('port'), 993) ) - cfg.mailbox = default_if_none(m.group('mailbox'), 'INBOX') + cfg.port = int( m.group('port') or 993 ) + cfg.mailbox = m.group('mailbox') or 'INBOX' except: p.error('Need to specify the server in the correct format.') - soft_makedirs(cfg.cachedir) + files.soft_makedirs(cfg.cachedir) - with logout(IMAP4_SSL(cfg.host, cfg.port)) as imap: - imap.login(cfg.user, cfg.passwd) + info('connecting and logging in') + + if True: + ###with networking.logout(imaplib.IMAP4_SSL(cfg.host, cfg.port)) as imap: + ###imap.login(cfg.user, cfg.passwd) # Close is only valid in the authenticated state. - with closing(imap) as imap: - # Select the main mailbox (INBOX). - imap.select(cfg.mailbox) + ###with closing(imap) as imap: - # Fetch message IDs, references, and senders. - xs = pickle_memoized \ - (lambda imap: cfg.cachedir / cleanse_filename(cfg.sender)) \ - (getmail) \ - (imap) + info('selecting mailbox') + ###imap.select(cfg.mailbox) - log.debug('fetched', xs) + dbpath = cfg.cachedir / files.cleanse_filename(cfg.sender) - info('building message-id map and determining the set of messages sent ' - 'by you or addressed to you (the "source set")') + # + # Fetch message IDs, references, and senders into persistent store. + # - srcs = [] - mid2msg = {} - # Every second item is just a closing paren. - # Example data: - # [('13300 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {67}', - # 'Message-ID: <mai...@py...>\r\n\r\n'), - # ')', - # ('13301 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {59}', - # 'Message-Id: <200...@hv...>\r\n\r\n'), - # ')', - # ('13302 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {92}', - # 'Message-ID: <C43EAFC0.2E3AE%ni...@ya...>\r\nIn-Reply-To: <481...@gm...>\r\n\r\n')] - for (envelope, data), paren in grouper(2, xs): - # Parse the body. - msg = message_from_string(data) + if cfg.refresh: + try: os.remove(dbpath) + except: pass - # Parse the envelope. - m = match( - r"(?P<seqno>\d+) \(UID (?P<uid>\d+) FLAGS \((?P<flags>[^)]+)\)", - envelope ) - msg.seqno = m.group('seqno') - msg.uid = m.group('uid') - msg.flags = m.group('flags').split() + if not cfg.no_fetch: + with closing(opendb(dbpath)) as mid2msg: - # Prepare a container for references to other msgs, and initialize the - # thread ID. - msg.refs = [] - msg.tid = None + minuid = mid2msg.get('maxuid', 1) + maxuid = getmaxuid(imap) - # Add these to the map. - if msg['Message-ID'] in mid2msg: - log.warning( 'dups', 'duplicate message IDs:', - msg['Message-ID'], msg['Subject'] ) - mid2msg[ msg['Message-ID'] ] = msg + # Every second item is just a closing paren. + # Example data: + # [('13300 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {67}', + # 'Message-ID: <mai...@py...>\r\n\r\n'), + # ')', + # ('13301 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {59}', + # 'Message-Id: <200...@hv...>\r\n\r\n'), + # ')', + # ('13302 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {92}', + # 'Message-ID: <C43EAFC0.2E3AE%ni...@ya...>\r\nIn-Reply-To: <481...@gm...>\r\n\r\n')] + pat = re.compile(r"(?P<seqno>\d+) \(UID (?P<uid>\d+) FLAGS \((?P<flags>[^)]+)\)") + for i, ((envelope, data), paren) in enumerate(seqs.grouper(2, getmail(imap, minuid, maxuid))): + # Parse the body. + msg = email.message_from_string(data) - # Add to "srcs" set if sent by us or addressed to us. - if ( cfg.sender in default_if_none( msg['From'], '' ) or - cfg.sender in default_if_none( msg['To'], '' ) or - cfg.sender in default_if_none( msg['Cc'], '' ) ): - srcs.append( msg ) + # Parse the envelope. + m = pat.match(envelope) + if m is None: raise Exception('envelope: %r' % envelope) + msg.seqno = m.group('seqno') + msg.uid = m.group('uid') + msg.flags = m.group('flags').split() - info( 'constructing undirected graph' ) + # Prepare a container for references to other msgs, and initialize the + # thread ID. + msg.refs = set() + msg.tid = None - for mid, msg in mid2msg.iteritems(): - # Extract any references. - irt = default_if_none( msg.get_all('In-Reply-To'), [] ) - refs = default_if_none( msg.get_all('References'), [] ) - refs = set( ' '.join( irt + refs ).replace('><', '> <').split() ) + # Add these to the map. + if msg['Message-ID'] in mid2msg: + log.warning( 'dups', 'duplicate message IDs:', + msg['Message-ID'], msg['Subject'] ) + mid2msg[ msg['Message-ID'] ] = msg - # Connect nodes in graph bidirectionally. Ignore references to MIDs - # that don't exist. - for ref in refs: - try: - refmsg = mid2msg[ref] - # We can use lists/append (not worry about duplicates) because the - # original sources should be acyclic. If a -> b, then there is no b -> - # a, so when crawling a we can add a <-> b without worrying that later - # we may re-add b -> a. - msg.refs.append(refmsg) - refmsg.refs.append(msg) - except: - log.warning( 'refs', ref ) + # Periodically sync to disk. + if len(mid2msg.cache) > 1000: mid2msg.sync() - info('finding connected components (grouping the messages into threads)') + mid2msg['maxuid'] = maxuid - tids = count() - tid2msgs = defaultdict(list) - for mid, msg in mid2msg.iteritems(): - if msg.tid is None: - thread_dfs(msg, tids.next(), tid2msgs) + with closing(opendb(dbpath)) as mid2msg: - info( 'starring the relevant threads, in which I am a participant' ) + info( 'maxuid', mid2msg['maxuid'] ) - rel_tids = set() - for srcmsg in srcs: - if srcmsg.tid not in rel_tids: - rel_tids.add(srcmsg.tid) - for msg in tid2msgs[srcmsg.tid]: - if r'\Flagged' not in msg.flags: - log.info( 'star', '\n', msg ) - if not cfg.pretend: - imap.uid('STORE', msg.uid, '+FLAGS', r'\Flagged') - if not cfg.no_mark_unseen and r'\Seen' in msg.flags: - imap.uid('STORE', msg.uid, '-FLAGS', r'\Seen') + info( 'constructing undirected graph' ) - info( 'unstarring irrelevant threads, in which I am not a participant' ) + for i, (mid, msg) in enumerate(mid2msg.iteritems()): + # Extract any references. + irt = msg.get_all('In-Reply-To', []) + refs = msg.get_all('References', []) + msg.refs.update( ' '.join( irt + refs ).replace('><', '> <').split() ) - all_tids = set( tid2msgs.iterkeys() ) - irrel_tids = all_tids - rel_tids - for tid in irrel_tids: - for msg in tid2msgs[tid]: - if r'\Flagged' in msg.flags: - log.info( 'unstar', '\n', msg ) - if not cfg.pretend: - imap.uid('STORE', msg.uid, '-FLAGS', r'\Flagged') - if not cfg.no_mark_seen and r'\Seen' not in msg.flags: - imap.uid('STORE', msg.uid, '+FLAGS', r'\Seen') + # Connect nodes in graph bidirectionally. Ignore references to MIDs + # that don't exist. + for ref in msg.refs: + try: mid2msg[ref].refs.add(msg['Message-ID']) + except KeyError: log.warning( 'no message with id', ref ) -run_main() + # Periodically sync to disk. + if len(mid2msg.cache) > 10000: + info( 'syncing; now at', i ) + mid2msg.sync() + + info('looking for relevant (grouping the messages into threads)') + + # Look for messages sent by us or addressed to us, and add their + # connected components into tid2msgs. + tids = itertools.count() + tid2msgs = collections.defaultdict(list) + for mid, msg in mid2msg.iteritems(): + if ( cfg.sender in msg.get('From', '' ) or + cfg.sender in msg.get('To', '' ) or + cfg.sender in msg.get('Cc', '' ) ): + thread_dfs(msg, tids.next(), mid2msg, tid2msgs) + + info( 'starring the relevant threads, in which I am a participant' ) + + rel_tids = set() + for srcmsg in srcs: + if srcmsg.tid not in rel_tids: + rel_tids.add(srcmsg.tid) + for msg in tid2msgs[srcmsg.tid]: + if r'\Flagged' not in msg.flags: + log.info( 'star', '\n', msg ) + if not cfg.pretend: + imap.uid('STORE', msg.uid, '+FLAGS', r'\Flagged') + if not cfg.no_mark_unseen and r'\Seen' in msg.flags: + imap.uid('STORE', msg.uid, '-FLAGS', r'\Seen') + + info( 'unstarring irrelevant threads, in which I am not a participant' ) + + all_tids = set( tid2msgs.iterkeys() ) + irrel_tids = all_tids - rel_tids + for tid in irrel_tids: + for msg in tid2msgs[tid]: + if r'\Flagged' in msg.flags: + log.info( 'unstar', '\n', msg ) + if not cfg.pretend: + imap.uid('STORE', msg.uid, '-FLAGS', r'\Flagged') + if not cfg.no_mark_seen and r'\Seen' not in msg.flags: + imap.uid('STORE', msg.uid, '+FLAGS', r'\Seen') + +startup.run_main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-15 07:01:23
|
Revision: 1419 http://assorted.svn.sourceforge.net/assorted/?rev=1419&view=rev Author: yangzhang Date: 2009-05-15 07:01:14 +0000 (Fri, 15 May 2009) Log Message: ----------- - updated rec_snapshot to use special disk-based reading - snapshots now only update whatever is necessary on disk (whatever changed) Modified Paths: -------------- ydb/trunk/src/main.clamp.lzz ydb/trunk/src/rectpcc.clamp.lzz ydb/trunk/src/tpcc.clamp.lzz Modified: ydb/trunk/src/main.clamp.lzz =================================================================== --- ydb/trunk/src/main.clamp.lzz 2009-05-15 06:58:54 UTC (rev 1418) +++ ydb/trunk/src/main.clamp.lzz 2009-05-15 07:01:14 UTC (rev 1419) @@ -38,7 +38,6 @@ typedef tuple<sized_array<char>, char*, char*> chunk; typedef commons::array<char> recovery_t; -typedef commons::array<char> wal_chunk; // Configuration. Modified: ydb/trunk/src/rectpcc.clamp.lzz =================================================================== --- ydb/trunk/src/rectpcc.clamp.lzz 2009-05-15 06:58:54 UTC (rev 1418) +++ ydb/trunk/src/rectpcc.clamp.lzz 2009-05-15 07:01:14 UTC (rev 1419) @@ -23,6 +23,14 @@ void rec_snapshot(int &seqno) { + long long before_read = current_time_millis(); + commons::array<char> arr = g_tables->read(snapshot_path); + size_t len = arr.size(); + char *rawbuf = arr.release(); + long long after_read = current_time_millis(); + showdatarate("read from disk", len, after_read - before_read); + +#if 0 // Prepare buffer. closingfd fd(checknnegerr(open(snapshot_path.c_str(), O_RDONLY))); size_t fsz = file_size(fd); @@ -31,15 +39,17 @@ // Read. long long before_read = current_time_millis(); - checkeqnneg(read(fd, rawbuf, fsz), static_cast<ssize_t>(fsz)); + checkeqnneg(read(fd, rawbuf, fsz), ssize_t(fsz)); long long after_read = current_time_millis(); showdatarate("read from disk", fsz, after_read - before_read); // Sanity. tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(rawbuf); checkeq(hdr.len, fsz); +#endif // Deserialize. + tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(rawbuf); long long before_deser = current_time_millis(); array<char> buf(rawbuf, hdr.len); g_tables->deser(0, 1, hdr, buf); Modified: ydb/trunk/src/tpcc.clamp.lzz =================================================================== --- ydb/trunk/src/tpcc.clamp.lzz 2009-05-15 06:58:54 UTC (rev 1418) +++ ydb/trunk/src/tpcc.clamp.lzz 2009-05-15 07:01:14 UTC (rev 1419) @@ -31,6 +31,8 @@ using namespace commons; using namespace std; +typedef commons::array<char> summary_t; + int snapshot_interval, pct_read_only; string snapshot_path; bool do_rec_snapshot, do_wal; @@ -481,7 +483,15 @@ st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) { + // + // Snapshot prep. + // + snapshot_writer_busy.set(false); + tpcc_recovery_header *tmphdr = new tpcc_recovery_header; + bzero(tmphdr, sizeof *tmphdr); + last_summary.reset(reinterpret_cast<char*>(tmphdr), sizeof *tmphdr); + bool caught_up = init_seqno == 0; // Means we're currently ignoring the incoming txns until we see a fail-ack // from the leader. @@ -554,6 +564,7 @@ } __ref(send_states).push(recovery_t()); snapshots.push(recovery_t()); + __ref(wal).sync(); __ref(w).mark_and_flush(); }); @@ -696,18 +707,22 @@ cout << "serializing snapshot, db state is now at seqno " << seqno << ":" << endl; g_tables->show(); - recovery_t recovery = g_tables->ser(0, 1, seqno); - showdatarate("serialized snapshot", recovery.size(), - current_time_millis() - start_time); + recovery_t recovery = g_tables->ser_partial(last_summary, 0, 1, seqno); + last_summary = g_tables->summarize(); + size_t len = reinterpret_cast<tpcc_recovery_header*>(recovery.get())->len; + showdatarate("serialized snapshot", len, current_time_millis() - start_time); snapshots.push(move(recovery)); snapshot_writer_busy.set(true); } } namespace { +summary_t last_summary; + concurrent_queue<recovery_t> snapshots; atomic<bool> snapshot_writer_busy; +typedef pair<commons::array<char>, size_t> wal_chunk; concurrent_queue<wal_chunk> wal_chunks; atomic<bool> wal_writer_busy; @@ -720,13 +735,13 @@ head_ += len; } void sync() { - wal_chunk tmp_(buf_size); + commons::array<char> tmp_(buf_size); swap(data_, tmp_); - wal_chunks.push_cond(move(tmp_), 0); + wal_chunks.push_cond(make_pair(move(tmp_), head_ - data_), 0); head_ = data_; } private: - wal_chunk data_; + commons::array<char> data_; char *head_; }; } @@ -738,8 +753,8 @@ closingfd fd(checknnegerr(creat("wal", 0644))); while (true) { wal_chunk chunk = wal_chunks.take(); - if (chunk.get() == nullptr) break; - checkeqnneg(write(fd, chunk, chunk.size()), ssize_t(chunk.size())); + if (chunk.first.get() == nullptr) break; + checkeqnneg(write(fd, chunk.first, chunk.second), ssize_t(chunk.second)); fdatasync(fd); } } @@ -748,18 +763,18 @@ snapshot_writer() { cout << "snapshot writer starting" << endl; + g_tables->remove(snapshot_path); while (true) { recovery_t rec = snapshots.take(); cout << "took one" << endl; if (rec.get() == nullptr) break; long long start_time = current_time_millis(); - { - ofstream of((snapshot_path + ".tmp").c_str()); - of.write(rec.get(), rec.size()); - } + size_t len = reinterpret_cast<tpcc_recovery_header*>(rec.get())->len; + g_tables->write(snapshot_path, rec); +#if 0 check0x(rename((snapshot_path + ".tmp").c_str(), snapshot_path.c_str())); - showdatarate("wrote snapshot", rec.size(), - current_time_millis() - start_time); +#endif + showdatarate("wrote snapshot", len, current_time_millis() - start_time); snapshot_writer_busy.set(false); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-15 06:59:07
|
Revision: 1418 http://assorted.svn.sourceforge.net/assorted/?rev=1418&view=rev Author: yangzhang Date: 2009-05-15 06:58:54 +0000 (Fri, 15 May 2009) Log Message: ----------- - added ser_partial, deser_partial, and {remove, write, read} for file ser/deser to tpcctables Modified Paths: -------------- ydb/trunk/src/tpcc/tpcctables.cc.cog ydb/trunk/src/tpcc/tpcctables.h Modified: ydb/trunk/src/tpcc/tpcctables.cc.cog =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-15 06:57:32 UTC (rev 1417) +++ ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-15 06:58:54 UTC (rev 1418) @@ -1,6 +1,5 @@ //[[[cog // allfields = ''' -// items // warehouses // stock // districts @@ -9,6 +8,7 @@ // orderlines // neworders // history +// items // '''.split() // treepairs = [ p.split('/') for p in ''' // warehouses/Warehouse @@ -37,17 +37,21 @@ #include "tpcctables.h" #include <algorithm> -#include <limits> -#include <vector> - +#include <boost/foreach.hpp> #include <commons/assert.h> #include <commons/check.h> +#include <commons/closing.h> +#include <commons/files.h> #include <commons/memory.h> #include <commons/versioned_heap.h> +#include <fcntl.h> #include <iostream> +#include <limits> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <vector> -#include <boost/foreach.hpp> - #ifdef DO_DUMP #include <fstream> // XXX #include <boost/lexical_cast.hpp> // XXX @@ -1036,9 +1040,10 @@ #if 0 void compare_summaries(const commons::array<char> &a, const commons::array<char> &b) { - raw_reader r(a); - tpcc_recovery_header hdr; - r.read(hdr); + raw_reader r(a), s(b); + tpcc_recovery_header ahdr, bhdr; + r.read(ahdr); + s.read(bhdr); //[[[cog // for name, struct in heappairs: // cog.outl(r''' @@ -1051,11 +1056,245 @@ //]]] //[[[end]]] } +#endif -TPCCTables::restore_partial(sizeof hdr) const +/** + * \param[in] summary Another heap's summary. + */ +commons::array<char> +TPCCTables::ser_partial(const commons::array<char> &summary, + int mypos, int nnodes, int seqno) const { - restore_partial + mypos = nnodes; // TODO use + using namespace std; + + // + // Deserialize summary header. + // + + raw_reader sreader(summary); + tpcc_recovery_header shdr; + sreader.read(shdr); + + // + // Serialize header. + // + + tpcc_recovery_header hdr; + bzero(&hdr, sizeof hdr); + hdr.seqno = seqno; + size_t metalen = 0, datalen = 0; + //[[[cog + // for name in allfields: + // cog.outl(r'hdr.n%s = uint32_t(%s_.size());' % (name, name)) + // for name, struct in heappairs: + // cog.outl(r''' + // versioned_heap<%(struct)s> &heap_%(name)s = heaps<%(struct)s>::heap; + // metalen += heap_%(name)s.metasize(); + // datalen += datasize(heap_%(name)s); + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + hdr.headlen = uint32_t(whole_units(sizeof hdr + metalen, pgsz()) * pgsz()); + hdr.len = uint32_t(hdr.headlen + datalen + sizeof(Item) * hdr.nitems); + void *raw_arr; + check0x(posix_memalign(&raw_arr, pgsz(), hdr.len)); + + // + // Serialize metas and datas. + // + + commons::array<char> arr(reinterpret_cast<char*>(raw_arr), hdr.len); + raw_writer writer(arr), dwriter(arr + hdr.headlen); + writer.write(hdr); + datalen = 0; + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // cout << "partially serializing %(name)s..." << flush; + // heap_%(name)s.sermeta(writer.ptr()); + // writer.skip(heap_%(name)s.metasize()); + // hdr.n%(name)s = uint32_t(serdata_partial(heap_%(name)s, dwriter.ptr(), sreader, shdr.n%(name)s)); + // dwriter.skip(hdr.n%(name)s * pgsz()); + // cout << "serialized " << hdr.n%(name)s << " of " << heap_%(name)s.pages().size() << endl; + // datalen += hdr.n%(name)s * pgsz(); + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + hdr.len = uint32_t(hdr.headlen + datalen + sizeof(Item) * hdr.nitems); + memput(arr, hdr); + + cout << "partially serializing items" << endl; + foreach (const Item &item, items_) { + dwriter.write(item); + } + + assert(arr.end() >= dwriter.ptr()); + + dump(neworders_, "ser", seqno); + + return arr; } -#endif +void TPCCTables::deser_partial(int mypos, int nnodes, + const tpcc_recovery_header &hdr, + const commons::array<char> &arr) +{ + mypos = nnodes; // TODO use + using namespace std; + typedef typeof(warehouses_) tree_warehouses; + + // This needs to be cleared because it's just a simple log to which things + // are appended. + history_.clear(); + history_.reserve(hdr.nhistory); + // This need to be cleared because things are erased from the map. + neworders_.clear(); + // This needs to be cleared because the hash is on the pointer. + customers_by_name_.clear(); + + char *meta = arr + sizeof hdr, *data = arr + hdr.headlen; + + // XXX determine which objects are actually live + + //[[[cog + // typedefs() + // for name, struct in heappairs: + // cbn = ( r'customers_by_name_.insert(val);' + // if name == 'customers' else '' ) + // obc = ( r'orders_by_customer_.insert(obc_keyof(*val), val);' + // if name == 'orders' else '' ) + // if name == 'neworders': + // insertion = r'%(name)s_.insert(make_pair(keyof(*val), val));' + // elif name == 'history': + // insertion = r'%(name)s_.push_back(val);' + // else: + // insertion = r'%(name)s_.insert(keyof(*val), val);' + // + // cbn = cbn % {'name': name, 'struct': struct} + // obc = obc % {'name': name, 'struct': struct} + // insertion = insertion % {'name': name, 'struct': struct} + // + // cog.outl(r''' + // versioned_heap<%(struct)s> &heap_%(name)s = heaps<%(struct)s>::heap; + // heap_%(name)s.deser_partial(meta, data, hdr.n%(name)s); + // + // // TODO scan over only the serialied portion? + // for (versioned_heap<%(struct)s>::iterator iter_%(name)s = heap_%(name)s.begin(); + // iter_%(name)s.cur() != nullptr; iter_%(name)s.next()) { + // %(struct)s *val = iter_%(name)s.cur(); + // %(insertion)s + // %(cbn)s%(obc)s + // } + // + // meta += heap_%(name)s.metasize(); + // data += datasize(heap_%(name)s); + // ''' % {'name': name, 'struct': struct, 'cbn': cbn, 'obc': obc, + // 'insertion': insertion}) + //]]] + //[[[end]]] + + raw_reader reader(data); + // This needs to be cleared because it's just a simple vector of randomly + // generated items (the set is static). + items_.clear(); + items_.reserve(hdr.nitems); + for (uint32_t i = 0; i < hdr.nitems; ++i) { + items_.push_back(reader.read<Item>()); + } + + serbuf_.reset(arr.get(), arr.size()); + + dump(neworders_, "deser", hdr.seqno); +} + +void +TPCCTables::remove(const string &basepath) const +{ + try_rm(basepath.c_str()); + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // try_rm((basepath + "_%(name)s").c_str()); + // ''' % {'name':name,'struct':struct}) + //]]] + //[[[end]]] + try_rm((basepath+"_items").c_str()); +} + +/** + * Updates the files starting at basepath. If the files already exist their + * contents must consist of older versions of these tables! + */ +void +TPCCTables::write(const string &basepath, const commons::array<char> &data) const +{ + tpcc_recovery_header hdr; + memget(data, hdr); + char *p = data + hdr.headlen; + closingfd fd(checknnegerr(creat(basepath.c_str(), 0644))); + write_file(fd, data, hdr.headlen); + + //[[[cog + // for name, struct in heappairs: + // cog.outl(''' + // versioned_heap<%(struct)s> &heap_%(name)s = heaps<%(struct)s>::heap; + // { + // closingfd fd(checknnegerr(open((basepath + "_%(name)s").c_str(), O_CREAT | O_WRONLY, 0644))); + // for (size_t i = 0; i < hdr.n%(name)s; ++i) { + // off_t pos = heap_%(name)s.hdrof(p).index * pgsz(); + // checkeq(lseek(fd, pos, 0), pos); + // write_file(fd, p, pgsz()); + // p += pgsz(); + // } + // } + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + + { + closingfd fd(checknnegerr(open((basepath + "_items").c_str(), O_CREAT | O_WRONLY, 0644))); + foreach (const Item &item, items_) { + write_file(fd, &item, sizeof(Item)); + } + } +} + +commons::array<char> +TPCCTables::read(const string &basepath) +{ + // Prepare buffer. + closingfd fd(checknnegerr(open(basepath.c_str(), O_RDONLY))); + size_t fsz = file_size(fd), tot_fsz = fsz; + //[[[cog + // for name in allfields: + // cog.outl(r''' + // closingfd fd_%(name)s(checknnegerr(open((basepath + "_%(name)s").c_str(), O_RDONLY))); + // size_t fsz_%(name)s = file_size(fd_%(name)s); + // tot_fsz += fsz_%(name)s; + // ''' % {'name': name}) + //]]] + //[[[end]]] + char *rawbuf; + check0x(posix_memalign(reinterpret_cast<void**>(&rawbuf), pgsz(), tot_fsz)); + + // Read. + char *p = rawbuf; + checkeqnneg(::read(fd, p, fsz), ssize_t(fsz)); + p += fsz; + //[[[cog + // for name in allfields: + // cog.outl(r''' + // { + // checkeqnneg( ::read(fd_%(name)s, p, fsz_%(name)s), + // ssize_t(fsz_%(name)s) ); + // p += fsz_%(name)s; + // } + // ''' % {'name': name}) + //]]] + //[[[end]]] + + return commons::array<char>(rawbuf, tot_fsz); +} + // vim:ft=cpp Modified: ydb/trunk/src/tpcc/tpcctables.h =================================================================== --- ydb/trunk/src/tpcc/tpcctables.h 2009-05-15 06:57:32 UTC (rev 1417) +++ ydb/trunk/src/tpcc/tpcctables.h 2009-05-15 06:58:54 UTC (rev 1418) @@ -92,8 +92,16 @@ commons::array<char> ser(int mypos, int nnodes, int seqno) const; void deser(int mypos, int nnodes, const tpcc_recovery_header &hdr, const commons::array<char> &arr); + commons::array<char> ser_partial(const commons::array<char> &summary, + int mypos, int nnodes, int seqno) const; + void deser_partial(int mypos, int nnodes, const tpcc_recovery_header &hdr, + const commons::array<char> &arr); commons::array<char> summarize() const; + void remove(const string &basepath) const; + void write(const string &basepath, const commons::array<char> &data) const; + commons::array<char> read(const string &basepath); + static const int KEYS_PER_INTERNAL = 8; static const int KEYS_PER_LEAF = 8; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-15 06:57:41
|
Revision: 1417 http://assorted.svn.sourceforge.net/assorted/?rev=1417&view=rev Author: yangzhang Date: 2009-05-15 06:57:32 +0000 (Fri, 15 May 2009) Log Message: ----------- - fixed printing in persist-ssh - inflated the start-recovery-seqno in nrec to match up more with drec Modified Paths: -------------- ydb/trunk/tools/persist-ssh ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/persist-ssh =================================================================== --- ydb/trunk/tools/persist-ssh 2009-05-14 14:37:52 UTC (rev 1416) +++ ydb/trunk/tools/persist-ssh 2009-05-15 06:57:32 UTC (rev 1417) @@ -8,7 +8,7 @@ while True: line = sys.stdin.readline() if line == "": break - print line + print line, if "Agent admitted failure" in line: sys.exit(77) ' || { ret=$? Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-05-14 14:37:52 UTC (rev 1416) +++ ydb/trunk/tools/test.bash 2009-05-15 06:57:32 UTC (rev 1417) @@ -441,7 +441,7 @@ nrec-helper() { local leader=$1 shift - : ${seqno:=100000} ${extraargs:=} + : ${seqno:=200000} ${extraargs:=} if [[ $param ]] then local paramargs="--$param $config" else local paramargs= This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-14 14:38:04
|
Revision: 1416 http://assorted.svn.sourceforge.net/assorted/?rev=1416&view=rev Author: yangzhang Date: 2009-05-14 14:37:52 +0000 (Thu, 14 May 2009) Log Message: ----------- - fixed filtering issue where we were dropping off the final values of a run from the filtering/aggregation - added stacked bar chart - removed after-recovery bars - adjusted legend location - merged the catch-up phases in the rectime graphs Modified Paths: -------------- ydb/trunk/tools/analysis.py Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-05-13 19:20:47 UTC (rev 1415) +++ ydb/trunk/tools/analysis.py 2009-05-14 14:37:52 UTC (rev 1416) @@ -12,6 +12,7 @@ def getname(path): return basename(realpath(path)) def mean(xs): return array(xs).mean() +def meanbutmin(xs): return array([x for x in xs if x != min(xs)]).mean() def last(xs): return xs[-1] def check(path): @@ -45,7 +46,8 @@ def repl(m): name, filt = m.group(1), m.group(2) # Duplicate check. - assert name not in filts, 'Capture %r exists more than once.' % name + assert filts.get(name, filt) == filt, 'Capture %r exists more than once and/or has conflicting aggregator %r.' % (name, filt) + #assert name not in filts, 'Capture %r exists more than once.' % name filts[name] = filt return '(?P<%s>' % name pats = [ re.sub(r'\(\?P<(\w+)\|(\w+)>', repl, pat) for pat in pats ] @@ -54,14 +56,24 @@ with file(path) as f: caps = {} # captures: name -> int/float sats = [ False for pat in pats ] - for line in f: - if line.startswith('=== '): print line,; caps = {}; sats = [False for pat in pats] + for line in itertools.chain(f, ['=== \n']): + if line.startswith('=== '): + if all(sats): + sats = [ False for pat in pats ] + caps = dict( (k, eval(filts.get(k, 'lambda x: x'))(v)) + for k,v in caps.iteritems() ) + assert all( type(v) != list for v in caps.itervalues() ) + print caps + yield xform(caps) + caps = {} + print line,; caps = {}; sats = [False for pat in pats] # if line == '\n': print '===', caps.keys(), ''.join('1' if s else '0' for s in sats) for i, pat in enumerate(pats): m = re.search(pat, line) if m: gd = dict( (k, float(v)) for (k,v) in m.groupdict().iteritems() ) for k, v in gd.iteritems(): + print k,v if k in caps: if k in filts: if type(caps[k]) != list: caps[k] = [caps[k]] @@ -72,13 +84,6 @@ caps[k] = v sats[i] = True break - if all(sats): - sats = [ False for pat in pats ] - caps = dict( (k, eval(filts.get(k, 'lambda x: x'))(v)) - for k,v in caps.iteritems() ) - assert all( type(v) != list for v in caps.itervalues() ) - yield xform(caps) - caps = {} # Aggregate the captured values. caps = list(getcaps()) print show_table1(caps) @@ -244,29 +249,35 @@ ylim(ymin = 0) savefig('stperf.png') -def groupbar(xvalues, yss): +def groupbar(xvalues, yss, stacked = False): d = yss xs = arange(len(xvalues)) cnt = sum(len(g) for g in d) + len(d) + 1 - space = 1. / cnt + space = 1. / len(d) if stacked else 1. / cnt width = space / 1.5 step = 1. / sum(len(g) for g in d) hues = ( colorsys.hls_to_rgb(step * i, .7, .4) for i in itertools.count() ) ehues = ( colorsys.hls_to_rgb(step * i, .3, .4) for i in itertools.count() ) - i = 1 + i = 0 for ys in yss: print ys for g in d: + bottom = 0 for yval, yerr, label in g: - bar(xs + space * i, yval, yerr = yerr, width = width, color = hues.next(), - edgecolor = (1,1,1), ecolor = ehues.next(), label = label) - i += 1 + bar(xs + space * i + (space - width) / 2., + yval, yerr = yerr, width = width, color = hues.next(), + edgecolor = (1,1,1), ecolor = ehues.next(), label = label, + bottom = bottom) + if stacked: bottom += yval + else: i += 1 i += 1 xticks(xs + .5, xvalues) + for line in gca().get_xticklines() + gca().get_yticklines(): + line.set_visible(False) def rectps(npath, dpath, xlab): res = {} @@ -276,8 +287,8 @@ res[label] = logextract(p, 'config', [ r'=== [^=]+=(?P<config>-?\d+)', r'before recovery,.*\((?P<prerec|mean>[\d\.]+) tps\)', - r'during recovery,.*\((?P<durrec|mean>[\d\.]+) tps\)', - r'after recovery,.*\((?P<postrec|mean>[\d\.]+) tps\)', + r'during recovery,.*\((?P<durrec|meanbutmin>[\d\.]+) tps\)', + #r'after recovery,.*\((?P<postrec|mean>[\d\.]+) tps\)', ]) #r'serialized recovery of (?P<recbytes|last>\d+) bytes in (?P<recser|mean>\d+) ms', #r'handled .*\((?P<tps>[.\d]+) tps\)', @@ -289,10 +300,10 @@ # y-value y-error label [ [ (a['prerec mean'], a['prerec sd'], 'before network recovery'), (a['durrec mean'], a['durrec sd'], 'during network recovery'), - (a['postrec mean'], a['postrec sd'], 'after network recovery') ], - [ (b['prerec mean'], b['prerec sd'], 'before disk recovery'), - (b['durrec mean'], b['durrec sd'], 'during disk recovery'), - (b['postrec mean'], b['postrec sd'], 'after disk recovery') ] ] ) + ], + [ (b['prerec mean'], b['prerec sd'], 'before disk-aided recovery'), + (b['durrec mean'], b['durrec sd'], 'during disk-aided recovery'), + ] ] ) ylim(ymin = 0) xlabel(xlab) ylabel('Mean TPS (stdev error bars)') @@ -302,21 +313,18 @@ def rectps_warehouses(npath, dpath): return rectps(npath, dpath, 'Number of warehouses') -def rectime(npath, dpath, name, xlab): +def rectime(npath, dpath, name, xlab, legloc): res = {} for label, p in [('network recovery', npath), ('disk checkpointing', dpath)]: print '=== rectime ===' print 'file:', getname(p) res[label] = logextract(p, 'config', [ r'=== [^=]+=(?P<config>-?\d+)', - r'before recovery,.*\((?P<prerec|mean>[\d\.]+) tps\)', - r'during recovery,.*\((?P<durrec|mean>[\d\.]+) tps\)', - r'after recovery,.*\((?P<postrec|mean>[\d\.]+) tps\)', r'serialized recovery of (?P<recbytes|last>\d+) bytes in (?P<recser|mean>\d+) ms', r'received recovery message .+ in (?P<recxfer>\d+) ms', r'deserialized recovery message .+ in (?P<recdeser>\d+) ms', - r'replayer caught up; from .+ in (?P<catchup1>\d+) ms', - r'final buffer replayer caught up .+ in (?P<catchup2>\d+) ms', + r'replayer caught up; from .+ in (?P<catchup|sum>\d+) ms', + r'final buffer replayer caught up .+ in (?P<catchup|sum>\d+) ms', r'handled .*\((?P<tps>[.\d]+) tps\)', ]) @@ -327,22 +335,23 @@ pairs = [ ('recser', 'serialization'), ('recxfer', 'xfer'), ('recdeser', 'deserialization'), - ('catchup1', 'catch-up phase 1'), - ('catchup2', 'catch-up phase 2') ] + #('catchup1', 'catch-up phase 1'), + ('catchup', 'catch-up') ] groupbar( res.values()[0]['config'], - [ foo('network recovery', pairs), foo('disk checkpointing', pairs)] ) + [ foo('network recovery', pairs), foo('disk checkpointing', pairs)], + stacked = True ) ylim(ymin = 0) xlabel(xlab) ylabel('Mean time (ms)') - legend(loc = 'upper left', prop = FontProperties(size = 'xx-small')) + legend(loc = legloc, prop = FontProperties(size = 'xx-small')) savefig('rectime-%s.png' % name) def rectime_warehouses(npath, dpath): - return rectime(npath, dpath, 'warehouses', 'Number of warehouses') + return rectime(npath, dpath, 'warehouses', 'Number of warehouses', 'upper left') def rectime_ro(npath, dpath): - return rectime(npath, dpath, 'ro', '% read-only txns') + return rectime(npath, dpath, 'ro', '% read-only txns', 'upper right') def main(argv): if len(argv) <= 1: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-13 19:20:57
|
Revision: 1415 http://assorted.svn.sourceforge.net/assorted/?rev=1415&view=rev Author: yangzhang Date: 2009-05-13 19:20:47 +0000 (Wed, 13 May 2009) Log Message: ----------- added aries to scalability experiments Modified Paths: -------------- ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-05-13 19:20:34 UTC (rev 1414) +++ ydb/trunk/tools/analysis.py 2009-05-13 19:20:47 UTC (rev 1415) @@ -105,8 +105,8 @@ print return res -def scaling(npath, dpath, title): - for label, p in [('network recovery', npath), ('disk checkpointing', dpath)]: +def scaling(npath, dpath, apath, title): + for label, p in [('network recovery', npath), ('disk checkpointing', dpath), ('ARIES WAL', apath)]: print '===', title, '===' print 'file:', getname(p) res = logextract(p, 'config', [ @@ -123,14 +123,14 @@ ylabel('Mean TPS (stdev error bars)') legend(loc = 'lower left', prop = FontProperties(size = 'small')) -def scaledata(npath, dpath): - scaling(npath, dpath, 'scaledata') +def scaledata(npath, dpath, apath): + scaling(npath, dpath, apath, 'scaledata') title('Scaling of throughput with number of warehouses') xlabel('Number of warehouses') savefig('scaledata.png') -def scalenodes(npath, dpath): - scaling(npath, dpath, 'scalenodes') +def scalenodes(npath, dpath, apath): + scaling(npath, dpath, apath, 'scalenodes') title('Scaling of throughput with number of nodes') xlabel('Node count') savefig('scalenodes.png') @@ -348,9 +348,9 @@ if len(argv) <= 1: print >> sys.stderr, 'Must specify a command' elif argv[1] == 'scaledata': - scaledata(*argv[2:] if len(argv) > 2 else ['nscaledata-log', 'dscaledata-log']) + scaledata(*argv[2:] if len(argv) > 2 else ['nscaledata-log', 'dscaledata-log', 'ascaledata-log']) elif argv[1] == 'scalenodes': - scalenodes(*argv[2:] if len(argv) > 2 else ['nscalenodes-log', 'dscalenodes-log']) + scalenodes(*argv[2:] if len(argv) > 2 else ['nscalenodes-log', 'dscalenodes-log', 'ascalenodes-log']) elif argv[1] == 'rectps': rectps_warehouses(*argv[2:] if len(argv) > 2 else ['nrec-warehouses', 'drec-warehouses']) elif argv[1] == 'rectime-warehouses': Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-05-13 19:20:34 UTC (rev 1414) +++ ydb/trunk/tools/test.bash 2009-05-13 19:20:47 UTC (rev 1415) @@ -567,7 +567,7 @@ } # ydb scalability test. -scaling-helper() { +scalenodes-helper() { local leader=$1 : ${stopseqno:=300000} shift @@ -578,10 +578,11 @@ done wait } -scaling() { hostargs scaling-helper ; } -exp-nscaling() { name=nscaling exp-var scaling stop ; } -exp-dscaling() { name=dscaling extraargs=--disk exp-var scaling stop ; } -exp-scaling() { exp-nscaling ; exp-dscaling ; } +scalenodes() { hostargs scalenodes-helper ; } +exp-nscalenodes() { name=nscalenodes exp-var scalenodes stop ; } +exp-dscalenodes() { name=dscalenodes extraargs=--disk exp-var scalenodes stop ; } +exp-ascalenodes() { name=ascalenodes extraargs=--wal exp-var scalenodes stop ; } +exp-scalenodes() { exp-nscalenodes ; exp-dscalenodes ; exp-ascalenodes ; } # # For scaling the size of the database. @@ -622,6 +623,7 @@ scaledata() { hostargs scaledata-helper ; } exp-nscaledata() { exp-var-param nscaledata scaledata warehouses stop 5 4 3 2 1 ; } exp-dscaledata() { extraargs=--disk exp-var-param dscaledata scaledata warehouses stop 5 4 3 2 1 ; } +exp-ascaledata() { extraargs=--wal exp-var-param ascaledata scaledata warehouses stop 5 4 3 2 1 ; } exp-scaledata() { exp-nscaledata ; exp-dscaledata ; } mtcp-helper() { @@ -713,25 +715,6 @@ exp-epperf() { exp-var epperf ; } # -# WAL experiments. -# - -aries() { - extraargs='--wal' scaling ${hosts:-} -} - -exp-aries() { - local out=aries-log-$(date +%Y-%m-%d-%H:%M:%S) - ln -sf $out aries-log - for i in {1..3} ; do - echo === n=-1 i=$i === - echo === n=-1 i=$i === > `tty` - aries - echo - done >& $out -} - -# # Prototype 2 # This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-13 19:20:36
|
Revision: 1414 http://assorted.svn.sourceforge.net/assorted/?rev=1414&view=rev Author: yangzhang Date: 2009-05-13 19:20:34 +0000 (Wed, 13 May 2009) Log Message: ----------- added note Modified Paths: -------------- ydb/trunk/README Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-05-13 19:20:13 UTC (rev 1413) +++ ydb/trunk/README 2009-05-13 19:20:34 UTC (rev 1414) @@ -1061,3 +1061,5 @@ - Network is simpler, can be stateless - Disk may quickly grow stale during downtime +- Network is more pipelined than disk; disk latency deeply intertwined with + throughput, but not so for network This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-13 19:20:26
|
Revision: 1413 http://assorted.svn.sourceforge.net/assorted/?rev=1413&view=rev Author: yangzhang Date: 2009-05-13 19:20:13 +0000 (Wed, 13 May 2009) Log Message: ----------- added asynchronous (background) wal (somewhat cheats since no time shifting; actually want to overlap wal of current txns with execing of next txns) Modified Paths: -------------- ydb/trunk/src/main.clamp.lzz ydb/trunk/src/replica.clamp.lzz ydb/trunk/src/tpcc/tpcctables.cc.cog ydb/trunk/src/tpcc.clamp.lzz Modified: ydb/trunk/src/main.clamp.lzz =================================================================== --- ydb/trunk/src/main.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) +++ ydb/trunk/src/main.clamp.lzz 2009-05-13 19:20:13 UTC (rev 1413) @@ -38,6 +38,7 @@ typedef tuple<sized_array<char>, char*, char*> chunk; typedef commons::array<char> recovery_t; +typedef commons::array<char> wal_chunk; // Configuration. Modified: ydb/trunk/src/replica.clamp.lzz =================================================================== --- ydb/trunk/src/replica.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) +++ ydb/trunk/src/replica.clamp.lzz 2009-05-13 19:20:13 UTC (rev 1413) @@ -31,6 +31,11 @@ thread(bind(snapshot_writer)); } + if (do_wal) { + cout << "starting the WAL writer" << endl; + thread(bind(wal_writer)); + } + // Initialize database state. int seqno = -1; mii &map = g_map; Modified: ydb/trunk/src/tpcc/tpcctables.cc.cog =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-13 17:30:11 UTC (rev 1412) +++ ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-13 19:20:13 UTC (rev 1413) @@ -1033,4 +1033,29 @@ return a; } +#if 0 +void compare_summaries(const commons::array<char> &a, const commons::array<char> &b) +{ + raw_reader r(a); + tpcc_recovery_header hdr; + r.read(hdr); + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // of << "%(name)s: " << hdr.n%(name)s << endl; + // for (size_t i = 0; i < hdr.n%(name)s; ++i) { + // of << r.read<size_t>() << ' '; + // of << r.read<int>() << endl; + // } + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] +} + +TPCCTables::restore_partial(sizeof hdr) const +{ + restore_partial +} +#endif + // vim:ft=cpp Modified: ydb/trunk/src/tpcc.clamp.lzz =================================================================== --- ydb/trunk/src/tpcc.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) +++ ydb/trunk/src/tpcc.clamp.lzz 2009-05-13 19:20:13 UTC (rev 1413) @@ -498,7 +498,7 @@ int first_seqno_in_chunk = -1; TpccReq req; TpccRes res; - txn_wal &wal = *g_twal; + //txn_wal &wal = *g_twal; function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { @@ -536,6 +536,7 @@ commons::array<char> wbuf(buf_size); anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), overflow_fn, rbuf.get(), rbuf.size()); + realwal wal; writer w(lambda(const void *buf, size_t len) { if (do_wal) __ref(wal).sync(); st_write(__ref(leader), buf, len); @@ -706,9 +707,44 @@ namespace { concurrent_queue<recovery_t> snapshots; atomic<bool> snapshot_writer_busy; + +concurrent_queue<wal_chunk> wal_chunks; +atomic<bool> wal_writer_busy; + +class realwal { + public: + realwal() : data_(read_buf_size), head_(data_) {} + void logbuf(void *data, size_t len) { + assert(head_ + len < data_.end()); + memcpy(head_, data, len); + head_ += len; + } + void sync() { + wal_chunk tmp_(buf_size); + swap(data_, tmp_); + wal_chunks.push_cond(move(tmp_), 0); + head_ = data_; + } + private: + wal_chunk data_; + char *head_; +}; } void +wal_writer() +{ + cout << "wal writer starting" << endl; + closingfd fd(checknnegerr(creat("wal", 0644))); + while (true) { + wal_chunk chunk = wal_chunks.take(); + if (chunk.get() == nullptr) break; + checkeqnneg(write(fd, chunk, chunk.size()), ssize_t(chunk.size())); + fdatasync(fd); + } +} + +void snapshot_writer() { cout << "snapshot writer starting" << endl; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-13 17:30:22
|
Revision: 1412 http://assorted.svn.sourceforge.net/assorted/?rev=1412&view=rev Author: yangzhang Date: 2009-05-13 17:30:11 +0000 (Wed, 13 May 2009) Log Message: ----------- added aries WALing Modified Paths: -------------- ydb/trunk/src/main.clamp.lzz ydb/trunk/src/rectpcc.clamp.lzz ydb/trunk/src/replica.clamp.lzz ydb/trunk/src/tpcc.clamp.lzz Modified: ydb/trunk/src/main.clamp.lzz =================================================================== --- ydb/trunk/src/main.clamp.lzz 2009-05-13 16:55:10 UTC (rev 1411) +++ ydb/trunk/src/main.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) @@ -1,6 +1,7 @@ #hdr #include "unsetprefs.h" #include <boost/tuple/tuple.hpp> +#include <commons/closing.h> #include <commons/st/intr.h> #include <commons/st/sync.h> #include <commons/st/channel.h> @@ -28,7 +29,7 @@ #include <commons/st/io.h> #include <commons/st/sockets.h> #include <iostream> -#include <unistd.h> // pipe, write, sync +#include <unistd.h> // pipe, write, fdatasync #include "tpcc/tpcctables.h" #include "msg.h" #include "setprefs.h" @@ -202,14 +203,15 @@ // TODO? class txn_wal { public: - txn_wal(const string &fname) : of(fname.c_str()) {} + txn_wal(const string &fname) : fd_(checknnegerr(creat(fname.c_str(), 0644))) {} void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } void logbuf(const void *buf, size_t len) { - of.write(reinterpret_cast<const char*>(buf), len); + checkeqnneg(write(fd_, reinterpret_cast<const char*>(buf), len), + ssize_t(len)); } - void flush() { of.flush(); } + void sync() { fdatasync(fd_); } private: - ofstream of; + closingfd fd_; }; // Globals @@ -257,8 +259,7 @@ cout << "got joiner's connection, sending log from seqnos " << r.start_seqno << " to " << r.end_seqno << endl; - g_twal->flush(); - sync(); + g_twal->sync(); ifstream inf("twal"); long long start_time = current_time_millis(); for (int seqno = 0; seqno < r.start_seqno; ++seqno) { Modified: ydb/trunk/src/rectpcc.clamp.lzz =================================================================== --- ydb/trunk/src/rectpcc.clamp.lzz 2009-05-13 16:55:10 UTC (rev 1411) +++ ydb/trunk/src/rectpcc.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) @@ -64,8 +64,7 @@ int &seqno = __ref(seqno); cout << "recovering from twal" << endl; long long start_time = current_time_millis(); - g_twal->flush(); - sync(); + g_twal->sync(); ifstream inf("twal"); TpccReq req; while (inf.peek() != ifstream::traits_type::eof()) { Modified: ydb/trunk/src/replica.clamp.lzz =================================================================== --- ydb/trunk/src/replica.clamp.lzz 2009-05-13 16:55:10 UTC (rev 1411) +++ ydb/trunk/src/replica.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) @@ -112,7 +112,7 @@ } // Initialize physical or txn log. - scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + scoped_ptr<txn_wal> twal(new txn_wal(do_wal ? "twal" : "/dev/null")); g_twal = twal.get(); scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); g_wal = pwal.get(); Modified: ydb/trunk/src/tpcc.clamp.lzz =================================================================== --- ydb/trunk/src/tpcc.clamp.lzz 2009-05-13 16:55:10 UTC (rev 1411) +++ ydb/trunk/src/tpcc.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) @@ -536,12 +536,8 @@ commons::array<char> wbuf(buf_size); anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), overflow_fn, rbuf.get(), rbuf.size()); - // TODO - //commit_log cwal; writer w(lambda(const void *buf, size_t len) { - //if (do_wal) { - //__ref(cwal).flush(); - //} + if (do_wal) __ref(wal).sync(); st_write(__ref(leader), buf, len); }, wbuf.get(), wbuf.size()); @@ -613,7 +609,7 @@ assert(!depleting); - if (use_twal) wal.logbuf(marker, reader.start() - marker); + if (do_wal) wal.logbuf(marker, reader.start() - marker); // Backlog (auto/implicit) or process. if (!caught_up) { @@ -636,7 +632,7 @@ ser(w, res); reader.set_anchor(); - // Snapsphot. + // Snapshot. if (disk && check_interval(req.seqno(), snapshot_interval)) snapshot(seqno); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-13 16:55:18
|
Revision: 1411 http://assorted.svn.sourceforge.net/assorted/?rev=1411&view=rev Author: yangzhang Date: 2009-05-13 16:55:10 +0000 (Wed, 13 May 2009) Log Message: ----------- added file sync demo Added Paths: ----------- sandbox/trunk/src/cc/sync.cc Added: sandbox/trunk/src/cc/sync.cc =================================================================== --- sandbox/trunk/src/cc/sync.cc (rev 0) +++ sandbox/trunk/src/cc/sync.cc 2009-05-13 16:55:10 UTC (rev 1411) @@ -0,0 +1,58 @@ +#include <unistd.h> +#include <fstream> +#include <commons/time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +using namespace commons; +using namespace std; +enum { sz = 4096 }; + +int main() { + char str[sz]; + int fd = creat("/tmp/out", 0644); + + { + long long start = current_time_millis(); + int i; + for (i = 0; (i & 0xf) != 0xf || current_time_millis() - start < 1000; ++i) { + write(fd, str, sz); + } + long long end = current_time_millis(); + cout << i << ' ' << end - start << endl; + fdatasync(fd); + } + + { + long long start = current_time_millis(); + int i; + for (i = 0; (i & 0xf) != 0xf || current_time_millis() - start < 1000; ++i) { + write(fd, str, sz); + fdatasync(fd); + } + long long end = current_time_millis(); + cout << i << ' ' << end - start << endl; + } + + { + long long start = current_time_millis(); + int i; + for (i = 0; (i & 0xf) != 0xf || current_time_millis() - start < 1000; ++i) { + write(fd, str, sz); + long long s = current_time_millis(); + fdatasync(fd); + cout << current_time_millis() - s << endl; + } + long long end = current_time_millis(); + cout << i << ' ' << end - start << endl; + } + +#if 0 + // Can't get the fd of an fstream. + ofstream of("/tmp/out"); + of.write(str, sz); +#endif + + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-12 20:31:55
|
Revision: 1410 http://assorted.svn.sourceforge.net/assorted/?rev=1410&view=rev Author: yangzhang Date: 2009-05-12 20:31:33 +0000 (Tue, 12 May 2009) Log Message: ----------- updated progress on rqe2 slides Modified Paths: -------------- ydb/trunk/README Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-05-12 20:29:27 UTC (rev 1409) +++ ydb/trunk/README 2009-05-12 20:31:33 UTC (rev 1410) @@ -662,13 +662,17 @@ - 0 - ? - # replicas - - runtime performance: baseline throughput vs. mechanism, baseline throughput vs. db size + - baseline runtime performance: + X throughput vs. mechanism + X throughput vs. db size + X throughput vs. nnodes - recovery experiments: + X DB size (MB) + X xact mix (% r/o) + - distribution + X system tput during various phases - crash time (MB accum or sec passed) - insertion rate (%) - - DB size (MB) - - xact mix (% r/o) - - system tput during various phases - implement incremental snapshotting of the hash table This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-12 20:29:36
|
Revision: 1409 http://assorted.svn.sourceforge.net/assorted/?rev=1409&view=rev Author: yangzhang Date: 2009-05-12 20:29:27 +0000 (Tue, 12 May 2009) Log Message: ----------- - added persist-ssh - added more experiments in prep for rqe2 talk to test.bash and analysis.py Modified Paths: -------------- ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Added Paths: ----------- ydb/trunk/tools/persist-ssh Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-05-12 20:28:33 UTC (rev 1408) +++ ydb/trunk/tools/analysis.py 2009-05-12 20:29:27 UTC (rev 1409) @@ -105,56 +105,36 @@ print return res -def scaling(scalingpath, ariespath): - print '=== scaling ===' - print 'file:', getname(scalingpath) - res = logextract(scalingpath, 'n', [ - r'=== n=(?P<n>-?\d+) ', - r'handled .*\((?P<tps>[.\d]+) tps\)' ]) +def scaling(npath, dpath, title): + for label, p in [('network recovery', npath), ('disk checkpointing', dpath)]: + print '===', title, '===' + print 'file:', getname(p) + res = logextract(p, 'config', [ + r'=== [^=]+=(?P<config>-?\d+) ', + r'handled .*\((?P<tps>[.\d]+) tps\)' ]) - print 'file:', getname(scalingpath) - res0 = logextract(scalingpath, 'n', [ - r'=== n=(?P<n>-?\d+) ', - r'issued .*\((?P<tps>[.\d]+) tps\)' ]) + print res['config'] + print res['tps mean'] + print res['tps sd'] - if path(ariespath).exists(): - print 'file:', getname(ariespath) - res2 = logextract(ariespath, 'n', [ - r'=== n=(?P<n>-?\d+) ', - r'issued .*\((?P<tps>[.\d]+) tps\)' ]) + errorbar(res['config'], res['tps mean'], res['tps sd'], label = label) + xlim(res['config'].min() - .5, res['config'].max() + .5) + ylim(ymin = 0) + ylabel('Mean TPS (stdev error bars)') + legend(loc = 'lower left', prop = FontProperties(size = 'small')) - print hstack([res2['n'], res0['n'][:1], res['n']]) - print hstack([res2['tps mean'], res0['tps mean'][:1], res['tps mean']]) - print hstack([res2['tps sd'], res0['tps sd'][:1], res['tps sd']]) +def scaledata(npath, dpath): + scaling(npath, dpath, 'scaledata') + title('Scaling of throughput with number of warehouses') + xlabel('Number of warehouses') + savefig('scaledata.png') - errorbar( - hstack([res2['n'], res0['n'][:1], res['n']]), - hstack([res2['tps mean'], res0['tps mean'][:1], res['tps mean']]), - hstack([res2['tps sd'], res0['tps sd'][:1], res['tps sd']])) - title('Scaling of baseline throughput with number of nodes') - xlabel('Node count') - ylabel('Mean TPS (stdev error bars)') - xlim(hstack([res2['n'], res['n']]).min() - .5, - hstack([res2['n'], res['n']]).max() + .5) - ylim(ymin = 0) - savefig('scaling.png') - else: - print hstack([res0['n'][:1], res['n']]) - print hstack([res0['tps mean'][:1], res['tps mean']]) - print hstack([res0['tps sd'][:1], res['tps sd']]) +def scalenodes(npath, dpath): + scaling(npath, dpath, 'scalenodes') + title('Scaling of throughput with number of nodes') + xlabel('Node count') + savefig('scalenodes.png') - errorbar( - hstack([res0['n'][:1], res['n']]), - hstack([res0['tps mean'][:1], res['tps mean']]), - hstack([res0['tps sd'][:1], res['tps sd']])) - title('Scaling of baseline throughput with number of nodes') - xlabel('Node count') - ylabel('Mean TPS (stdev error bars)') - xlim(hstack([res['n']]).min() - .5, - hstack([res['n']]).max() + .5) - ylim(ymin = 0) - savefig('scaling.png') - def run(singlepath, multipath): singlepath, multipath = map(path, [singlepath, multipath]) ress = [] @@ -264,11 +244,119 @@ ylim(ymin = 0) savefig('stperf.png') +def groupbar(xvalues, yss): + d = yss + + xs = arange(len(xvalues)) + cnt = sum(len(g) for g in d) + len(d) + 1 + space = 1. / cnt + width = space / 1.5 + + step = 1. / sum(len(g) for g in d) + hues = ( colorsys.hls_to_rgb(step * i, .7, .4) for i in itertools.count() ) + ehues = ( colorsys.hls_to_rgb(step * i, .3, .4) for i in itertools.count() ) + i = 1 + for ys in yss: + print ys + + for g in d: + for yval, yerr, label in g: + bar(xs + space * i, yval, yerr = yerr, width = width, color = hues.next(), + edgecolor = (1,1,1), ecolor = ehues.next(), label = label) + i += 1 + i += 1 + + xticks(xs + .5, xvalues) + +def rectps(npath, dpath, xlab): + res = {} + for label, p in [('network recovery', npath), ('disk checkpointing', dpath)]: + print '=== rectps ===' + print 'file:', getname(p) + res[label] = logextract(p, 'config', [ + r'=== [^=]+=(?P<config>-?\d+)', + r'before recovery,.*\((?P<prerec|mean>[\d\.]+) tps\)', + r'during recovery,.*\((?P<durrec|mean>[\d\.]+) tps\)', + r'after recovery,.*\((?P<postrec|mean>[\d\.]+) tps\)', + ]) + #r'serialized recovery of (?P<recbytes|last>\d+) bytes in (?P<recser|mean>\d+) ms', + #r'handled .*\((?P<tps>[.\d]+) tps\)', + + a,b = res['network recovery'], res['disk checkpointing'] + + groupbar( + res.values()[0]['config'], + # y-value y-error label + [ [ (a['prerec mean'], a['prerec sd'], 'before network recovery'), + (a['durrec mean'], a['durrec sd'], 'during network recovery'), + (a['postrec mean'], a['postrec sd'], 'after network recovery') ], + [ (b['prerec mean'], b['prerec sd'], 'before disk recovery'), + (b['durrec mean'], b['durrec sd'], 'during disk recovery'), + (b['postrec mean'], b['postrec sd'], 'after disk recovery') ] ] ) + ylim(ymin = 0) + xlabel(xlab) + ylabel('Mean TPS (stdev error bars)') + legend(loc = 'lower left', prop = FontProperties(size = 'small')) + savefig('rectps.png') + +def rectps_warehouses(npath, dpath): + return rectps(npath, dpath, 'Number of warehouses') + +def rectime(npath, dpath, name, xlab): + res = {} + for label, p in [('network recovery', npath), ('disk checkpointing', dpath)]: + print '=== rectime ===' + print 'file:', getname(p) + res[label] = logextract(p, 'config', [ + r'=== [^=]+=(?P<config>-?\d+)', + r'before recovery,.*\((?P<prerec|mean>[\d\.]+) tps\)', + r'during recovery,.*\((?P<durrec|mean>[\d\.]+) tps\)', + r'after recovery,.*\((?P<postrec|mean>[\d\.]+) tps\)', + r'serialized recovery of (?P<recbytes|last>\d+) bytes in (?P<recser|mean>\d+) ms', + r'received recovery message .+ in (?P<recxfer>\d+) ms', + r'deserialized recovery message .+ in (?P<recdeser>\d+) ms', + r'replayer caught up; from .+ in (?P<catchup1>\d+) ms', + r'final buffer replayer caught up .+ in (?P<catchup2>\d+) ms', + r'handled .*\((?P<tps>[.\d]+) tps\)', + ]) + + def foo(k, pairs): + x = res[k] + return [(x[name + ' mean'], x[name + ' sd'], k + ' ' + label) for name, label in pairs] + + pairs = [ ('recser', 'serialization'), + ('recxfer', 'xfer'), + ('recdeser', 'deserialization'), + ('catchup1', 'catch-up phase 1'), + ('catchup2', 'catch-up phase 2') ] + groupbar( res.values()[0]['config'], + [ foo('network recovery', pairs), foo('disk checkpointing', pairs)] ) + + ylim(ymin = 0) + xlabel(xlab) + ylabel('Mean time (ms)') + legend(loc = 'upper left', prop = FontProperties(size = 'xx-small')) + savefig('rectime-%s.png' % name) + +def rectime_warehouses(npath, dpath): + return rectime(npath, dpath, 'warehouses', 'Number of warehouses') + +def rectime_ro(npath, dpath): + return rectime(npath, dpath, 'ro', '% read-only txns') + def main(argv): if len(argv) <= 1: print >> sys.stderr, 'Must specify a command' - elif argv[1] == 'scaling': - scaling(*argv[2:] if len(argv) > 2 else ['scaling-log', 'aries-log']) + elif argv[1] == 'scaledata': + scaledata(*argv[2:] if len(argv) > 2 else ['nscaledata-log', 'dscaledata-log']) + elif argv[1] == 'scalenodes': + scalenodes(*argv[2:] if len(argv) > 2 else ['nscalenodes-log', 'dscalenodes-log']) + elif argv[1] == 'rectps': + rectps_warehouses(*argv[2:] if len(argv) > 2 else ['nrec-warehouses', 'drec-warehouses']) + elif argv[1] == 'rectime-warehouses': + rectime_warehouses(*argv[2:] if len(argv) > 2 else ['nrec-warehouses', 'drec-warehouses']) + elif argv[1] == 'rectime-ro': + rectime_ro(*argv[2:] if len(argv) > 2 else ['nrec-ro', 'drec-ro']) elif argv[1] == 'run': run(*argv[2:] if len(argv) > 2 else ['single-log', 'multi-log']) elif argv[1] == 'mtcp': Added: ydb/trunk/tools/persist-ssh =================================================================== --- ydb/trunk/tools/persist-ssh (rev 0) +++ ydb/trunk/tools/persist-ssh 2009-05-12 20:29:27 UTC (rev 1409) @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +set -o errexit + +while true; do + ssh -o PasswordAuthentication=no "$@" 2>&1 | python -u -c ' +import sys +while True: + line = sys.stdin.readline() + if line == "": break + print line + if "Agent admitted failure" in line: sys.exit(77) +' || { + ret=$? + if [[ $ret == 77 ]] + then continue + else return $ret + fi +} + break +done Property changes on: ydb/trunk/tools/persist-ssh ___________________________________________________________________ Added: svn:executable + * Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-05-12 20:28:33 UTC (rev 1408) +++ ydb/trunk/tools/test.bash 2009-05-12 20:29:27 UTC (rev 1409) @@ -48,8 +48,11 @@ mssh -l root "$@" } +# Both tags each line of output and additionally retries ssh if there was an +# error logging in with the agent. tagssh() { - ssh "$@" 2>&1 | python -u -c ' + while true; do + ssh -o PasswordAuthentication=no "$@" 2>&1 | python -u -c ' import time, sys, socket # def fmt(*xs): return " ".join(map(str, xs)) + "\n" # s = socket.socket() @@ -60,7 +63,10 @@ line = sys.stdin.readline() if line == "": break print >> f, sys.argv[1], time.time(), ":\t", line, -' $1 + if "Agent admitted failure" in line: sys.exit(77) +' $1 || if (( $? == 77 )) ; then continue ; fi + break + done } check-remote() { @@ -81,7 +87,7 @@ } parssh() { - parhosts ssh ^ "set -o errexit -o nounset; $@" + parhosts "$(dirname "$0")/persist-ssh" ^ "set -o errexit -o nounset; $@" } parscp() { @@ -204,6 +210,14 @@ refresh-local } +node-setup-python-commons() { + check-remote + svn up -q ~/work/assorted/ + cd ~/work/assorted/python-commons/trunk/ + ./setup.bash -d -p ~/.local/pkg/python-commons/ + refresh-local +} + node-setup-shell-tools() { check-remote svn up -q ~/work/assorted/ @@ -229,7 +243,8 @@ refresh-local cd ~/ydb/src make -s clean - PPROF=1 OPT=1 make -sj16 ydb WTF= DISTCC= + OPT=1 make -sj16 ydb WTF= + #PPROF=1 OPT=1 make -sj16 ydb WTF= DISTCC= } node-setup-cog() { @@ -314,6 +329,7 @@ parremote node-setup-m4 parremote node-setup-bison parremote node-setup-clamp + parremote node-setup-python-commons parremote node-setup-gtest parremote node-setup-ghash parremote node-setup-cog @@ -392,20 +408,55 @@ # Experiments involving ydb recovery (varying amount of data). # -rec-helper() { +drec-helper() { local leader=$1 shift + : ${seqno:=170000} ${extraargs:=} + if [[ $param ]] + then local paramargs="--$param $config" + else local paramargs= + fi + tagssh $leader "set -x; ydb/src/ydb --tpcc -l --exit-on-recovery -n $# $paramargs $extraargs" & + sleep .1 + # Run initial replicas. + while (( $# > 1 )) ; do + tagssh $1 "set -x; ydb/src/ydb --tpcc -H $leader $paramargs $extraargs" & + shift + done + sleep .1 + # Run joiner. + tagssh $1 "set -x; ydb/src/ydb --tpcc --disk -H $leader --fail-seqno $seqno \ + --yield-build-up --yield-catch-up $paramargs $extraargs" & + if false ; then + if [[ ${wait2:-} ]] + then sleep $wait2 + else read + fi + tagssh $leader "pkill -sigint ydb" + fi + wait +} +drec() { hostargs drec-helper ; } + +nrec-helper() { + local leader=$1 + shift : ${seqno:=100000} ${extraargs:=} - tagssh $leader "set -x; ydb/src/ydb --tpcc -l --exit-on-recovery --accept-joiner-seqno $seqno -n $(( $# - 1 )) $extraargs" & + if [[ $param ]] + then local paramargs="--$param $config" + else local paramargs= + fi + #tagssh $leader "while true; do ps u -p \`pgrep ydb\` 2> /dev/null || true; sleep 2; done& set -x; ydb/src/ydb --tpcc -l --exit-on-recovery --accept-joiner-seqno $seqno -n $(( $# - 1 )) $extraargs; pkill -f 'while true'" & + tagssh $leader "set -x; ydb/src/ydb --tpcc -l --exit-on-recovery --accept-joiner-seqno $seqno -n $(( $# - 1 )) $paramargs $extraargs" & sleep .1 # Run initial replicas. while (( $# > 1 )) ; do - tagssh $1 "set -x; ydb/src/ydb --tpcc -H $leader" & + tagssh $1 "set -x; ydb/src/ydb --tpcc -H $leader $paramargs $extraargs" & shift done sleep .1 # Run joiner. - tagssh $1 "set -x; ydb/src/ydb --tpcc -H $leader --yield-build-up --yield-catch-up $extraargs" & + tagssh $1 "set -x; ydb/src/ydb --tpcc -H $leader --yield-build-up --yield-catch-up $paramargs $extraargs" & if false ; then if [[ ${wait2:-} ]] then sleep $wait2 @@ -415,16 +466,19 @@ fi wait } -rec() { hostargs rec-helper ; } +nrec() { hostargs nrec-helper ; } # Recovery experient. exp-rec() { - for seqno in 300000 200000 100000 ; do # configurations + local cmd=$1 + param=$2 + : ${trials:=3} + shift 2 + for config in "$@" ; do stop - for i in {1..3} ; do # trials - echo === seqno=$seqno i=$i === - echo === seqno=$seqno i=$i === > `tty` - rec + for i in $(seq $trials) ; do + echo === $param=$config i=$i === + $cmd sleep 1 stop sleep .1 @@ -434,19 +488,39 @@ } # Single-host recovery experiment. -exp-rec-single() { +exp-nrec-single() { local out=single-log-$(date +%Y-%m-%d-%H:%M:%S) ln -sf $out single-log - exp-rec >& $out + exp-nrec >& $out } # Multi-host recovery experiment. -exp-rec-multi() { +exp-nrec-multi() { local out=multi-log-$(date +%Y-%m-%d-%H:%M:%S) ln -sf $out multi-log - extraargs="-m ${extraargs:-}" exp-rec >& $out + extraargs="-m ${extraargs:-}" exp-rec nrec seqno >& $out } +# Vary DB size +exp-rec-ro() { + local cmd=$1 + local out=$cmd-ro-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out $cmd-ro + extraargs="${extraargs:-}" exp-rec $cmd pct-read-only 8 32 64 96 2>&1 | tee $out +} +exp-nrec-ro() { exp-rec-ro nrec ; } +exp-drec-ro() { exp-rec-ro drec ; } + +# Vary DB size +exp-rec-warehouses() { + local cmd=$1 + local out=$cmd-warehouses-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out $cmd-warehouses + extraargs="${extraargs:-}" exp-rec $cmd warehouses 4 3 2 1 2>&1 | tee $out +} +exp-nrec-warehouses() { exp-rec-warehouses nrec ; } +exp-drec-warehouses() { exp-rec-warehouses drec ; } + stop-helper() { tagssh $1 'pkill -sigint ydb' } @@ -471,13 +545,14 @@ # Repeat some experiment some number of trials and for varying numbers of hosts. exp-var() { - local name=$1 cmd=$1 stop=${2:-} + local cmd=$1 stop=${2:-} + : ${name:=$cmd} ${trials:=3} local out=$name-log-$(date +%Y-%m-%d-%H:%M:%S-%N) local orighosts="$hosts" maxn=$(( $(echo $hosts | wc -w) - 1 )) ln -sf $out $name-log for n in `seq $maxn -1 ${minn:-1}` ; do # configurations $stop - for i in {1..3} ; do # trials + for i in $(seq $trials) ; do echo === n=$n i=$i === $cmd sleep 1 @@ -494,18 +569,61 @@ # ydb scalability test. scaling-helper() { local leader=$1 - : ${stopseqno:=100000} + : ${stopseqno:=300000} shift - tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X $stopseqno --tpcc ${extraargs:-}" & + tagssh $leader "set -x; CPUPROFILE=ydb.prof ydb/src/ydb --tpcc -q -l -n $# -X $stopseqno ${extraargs:-}" & sleep .1 for rep in "$@" - do tagssh $rep "CPUPROFILE=ydb.prof ydb/src/ydb -q -n $# -H $leader --tpcc ${extraargs:-}" & + do tagssh $rep "set -x; CPUPROFILE=ydb.prof ydb/src/ydb --tpcc -q -n $# -H $leader ${extraargs:-}" & done wait } scaling() { hostargs scaling-helper ; } -exp-scaling() { minn=0 exp-var scaling stop ; } +exp-nscaling() { name=nscaling exp-var scaling stop ; } +exp-dscaling() { name=dscaling extraargs=--disk exp-var scaling stop ; } +exp-scaling() { exp-nscaling ; exp-dscaling ; } +# +# For scaling the size of the database. +# + +exp-var-param() { + local name=$1 cmd=$2 stop=${4:-} + param=$3 + shift 4 + : ${trials:=3} + local out=$name-log-$(date +%Y-%m-%d-%H:%M:%S-%N) + ln -sf $out $name-log + for config in "$@" ; do # configurations + $stop + for i in $(seq $trials) ; do + echo === $param=$config i=$i === + $cmd + sleep 1 + if [[ $stop ]] + then $stop; sleep .1 + fi + echo + done + done 2>&1 | tee $out +} + +scaledata-helper() { + local leader=$1 + : ${stopseqno:=300000} + shift + tagssh $leader "set -x; CPUPROFILE=ydb.prof ydb/src/ydb --tpcc -q -l -n $# -X $stopseqno --$param $config ${extraargs:-}" & + sleep .1 + for rep in "$@" + do tagssh $rep "set -x; CPUPROFILE=ydb.prof ydb/src/ydb --tpcc -q -n $# -H $leader --$param $config ${extraargs:-}" & + done + wait +} +scaledata() { hostargs scaledata-helper ; } +exp-nscaledata() { exp-var-param nscaledata scaledata warehouses stop 5 4 3 2 1 ; } +exp-dscaledata() { extraargs=--disk exp-var-param dscaledata scaledata warehouses stop 5 4 3 2 1 ; } +exp-scaledata() { exp-nscaledata ; exp-dscaledata ; } + mtcp-helper() { local leader=$1 n=$(( $# - 1 )) : ${sender:=socat} ${receiver:=socat} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-12 20:28:51
|
Revision: 1408 http://assorted.svn.sourceforge.net/assorted/?rev=1408&view=rev Author: yangzhang Date: 2009-05-12 20:28:33 +0000 (Tue, 12 May 2009) Log Message: ----------- - added --pct-read-only - started adding --wal - fixed handle_recovery_tpcc's switching among recovery states Modified Paths: -------------- ydb/trunk/src/replica.clamp.lzz ydb/trunk/src/tpcc/tpccclient.cc ydb/trunk/src/tpcc/tpccclient.h ydb/trunk/src/tpcc.clamp.lzz ydb/trunk/src/ydb.clamp.lzz Modified: ydb/trunk/src/replica.clamp.lzz =================================================================== --- ydb/trunk/src/replica.clamp.lzz 2009-05-12 20:24:56 UTC (rev 1407) +++ ydb/trunk/src/replica.clamp.lzz 2009-05-12 20:28:33 UTC (rev 1408) @@ -62,7 +62,7 @@ cout << "loaded " << nwarehouses << " warehouses in " << current_time_millis() - start_time << " ms" << endl; tables->show(); - g_tables->ser(0, 0, seqno); + //g_tables->ser(0, 0, seqno); } recovery_t orig = do_rec_twal ? g_tables->ser(0, 0, seqno) : recovery_t(); Modified: ydb/trunk/src/tpcc/tpccclient.cc =================================================================== --- ydb/trunk/src/tpcc/tpccclient.cc 2009-05-12 20:24:56 UTC (rev 1407) +++ ydb/trunk/src/tpcc/tpccclient.cc 2009-05-12 20:28:33 UTC (rev 1408) @@ -16,7 +16,7 @@ const float TPCCClient::MAX_PAYMENT_AMOUNT; TPCCClient::TPCCClient(Clock* clock, RandomGenerator* generator, TPCCDB* db, int num_items, - int num_warehouses, int districts_per_warehouse, int customers_per_district) : + int num_warehouses, int districts_per_warehouse, int customers_per_district, int pct_read_only) : clock_(clock), generator_(generator), db_(db), @@ -24,6 +24,7 @@ num_warehouses_(num_warehouses), districts_per_warehouse_(districts_per_warehouse), customers_per_district_(customers_per_district) { + setPctReadOnly(pct_read_only); ASSERT(clock_ != NULL); ASSERT(generator_ != NULL); ASSERT(db_ != NULL); @@ -149,21 +150,16 @@ // maintained. This is close to the right thing, but not precisely correct. // See TPC-C 5.2.4 (page 68). int x = generator_->number(1, 100); - if (x <= 4) { // 4% - //printf("stocklevel\n"); + if (x <= pct_stock_level_) { // 4% doStockLevel(); - } else if (x <= 8) { // 4% - //printf("delivery\n"); - doDelivery(); - } else if (x <= 12) { // 4% - //printf("orderstatus\n"); + } else if (x <= pct_read_only_) { // 4% doOrderStatus(); - } else if (x <= 12+43) { // 43% - //printf("payment\n"); + } else if (x <= pct_delivery_) { // 4% + doDelivery(); + } else if (x <= pct_payment_) { // 43% doPayment(); } else { // 45% - //printf("neworder\n"); - ASSERT(x > 100-45); + // ASSERT(x > 100-45); doNewOrder(); } } Modified: ydb/trunk/src/tpcc/tpccclient.h =================================================================== --- ydb/trunk/src/tpcc/tpccclient.h 2009-05-12 20:24:56 UTC (rev 1407) +++ ydb/trunk/src/tpcc/tpccclient.h 2009-05-12 20:28:33 UTC (rev 1408) @@ -15,7 +15,8 @@ public: // Owns clock, generator and db. TPCCClient(Clock* clock, RandomGenerator* generator, TPCCDB* db, int num_items, - int num_warehouses, int districts_per_warehouse, int customers_per_district); + int num_warehouses, int districts_per_warehouse, int customers_per_district, + int pct_read_only); ~TPCCClient(); void doStockLevel(); @@ -39,6 +40,15 @@ int32_t generateCID(); int32_t generateItemID(); + void setPctReadOnly(int pct) { + pct_read_only_ = pct; + pct_stock_level_ = pct / 2; + pct_order_status_ = pct_stock_level_ + pct / 2; + int non_read_only = 100 - pct; + pct_delivery_ = pct_order_status_ + non_read_only * 4 / 100; + pct_payment_ = pct_delivery_ + non_read_only * 43 / 100; + } + Clock* clock_; RandomGenerator* generator_; TPCCDB* db_; @@ -46,6 +56,8 @@ int num_warehouses_; int districts_per_warehouse_; int customers_per_district_; + int pct_read_only_, pct_stock_level_, pct_order_status_, pct_delivery_, + pct_payment_; }; #endif Modified: ydb/trunk/src/tpcc.clamp.lzz =================================================================== --- ydb/trunk/src/tpcc.clamp.lzz 2009-05-12 20:24:56 UTC (rev 1407) +++ ydb/trunk/src/tpcc.clamp.lzz 2009-05-12 20:28:33 UTC (rev 1408) @@ -31,9 +31,9 @@ using namespace commons; using namespace std; -int snapshot_interval; +int snapshot_interval, pct_read_only; string snapshot_path; -bool do_rec_snapshot; +bool do_rec_snapshot, do_wal; class OrderStatusOutput; class PaymentOutput; @@ -247,7 +247,8 @@ // Client owns all the parameters st_tpcc &tables = *new st_tpcc(fds); TPCCClient client(clock, random, &tables, Item::NUM_ITEMS, nwarehouses, - District::NUM_PER_WAREHOUSE, Customer::NUM_PER_DISTRICT); + District::NUM_PER_WAREHOUSE, Customer::NUM_PER_DISTRICT, + pct_read_only); while (!stop_hub) { // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an @@ -369,6 +370,8 @@ st_intr intr(stop_hub); cout << "node " << show_sockaddr(replica.fd()) << " failed" << endl; delreps.push(replica); + recover_signals.push(current_time_millis()); + loop_cleanup(); { st_intr intr(kill_hub); readmsg(reader, res); @@ -377,6 +380,7 @@ cout << "failed node " << show_sockaddr(replica.fd()) << " resumed; resuming at " << seqno - 1 << endl; last_seqno = seqno - 1; + caught_up = false; newreps.push(replica); } else { @@ -532,7 +536,12 @@ commons::array<char> wbuf(buf_size); anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), overflow_fn, rbuf.get(), rbuf.size()); + // TODO + //commit_log cwal; writer w(lambda(const void *buf, size_t len) { + //if (do_wal) { + //__ref(cwal).flush(); + //} st_write(__ref(leader), buf, len); }, wbuf.get(), wbuf.size()); @@ -584,7 +593,6 @@ if (req.seqno() == -1) { // End of stream. - cout << "here" << endl; break; } else if (req.seqno() == -2) { // Prepare recovery msg. Modified: ydb/trunk/src/ydb.clamp.lzz =================================================================== --- ydb/trunk/src/ydb.clamp.lzz 2009-05-12 20:24:56 UTC (rev 1407) +++ ydb/trunk/src/ydb.clamp.lzz 2009-05-12 20:28:33 UTC (rev 1408) @@ -163,6 +163,8 @@ "number of txns to process between snapshots (for worker)") ("batch-size,b", po::value<int>(&batch_size)->default_value(100), "number of txns to batch up in each msg (for leader)") + ("wal", po::bool_switch(&do_wal), + "perform ARIES-style write-ahead logging for committed transactions") ("tpcc", po::bool_switch(&do_tpcc), "run the TPCC workload") ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), @@ -225,6 +227,8 @@ ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(200000), "timeout for some IO operations that should actually time out (in microseconds)") ("test", "execute unit tests instead of running the normal system") + ("pct-read-only", po::value<int>(&pct_read_only)->default_value(8), + "% of txns that are either STOCK LEVEL/ORDER STATUS") ("minreps,n", po::value<int>(&minreps)->default_value(2), "minimum number of replicas the system is willing to process txns on"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-12 20:25:11
|
Revision: 1407 http://assorted.svn.sourceforge.net/assorted/?rev=1407&view=rev Author: yangzhang Date: 2009-05-12 20:24:56 +0000 (Tue, 12 May 2009) Log Message: ----------- added sqlite3-based dbm Added Paths: ----------- python-commons/trunk/src/commons/sqlhash.py Added: python-commons/trunk/src/commons/sqlhash.py =================================================================== --- python-commons/trunk/src/commons/sqlhash.py (rev 0) +++ python-commons/trunk/src/commons/sqlhash.py 2009-05-12 20:24:56 UTC (rev 1407) @@ -0,0 +1,170 @@ +#!/usr/bin/env python3.0 +# From <http://code.activestate.com/recipes/576638/> +''' Dbm based on sqlite -- Needed to support shelves + +Key and values are always stored as bytes. This means that when strings are +used they are implicitly converted to the default encoding before being +stored. + +Issues: + + # ??? how to coordinate with whichdb + # ??? Size of text fields fixed or varchar (do we need blobs) + # ??? does default encoding affect str-->bytes or PySqlite3 always use UTF-8 + # ??? if pure python overhead and pysqlite overhead is too high, rewrite in C +''' + +__all__ = ['error', 'open'] + +import sqlite3 +import collections +from operator import itemgetter + +error = sqlite3.DatabaseError + +class SQLhash(collections.MutableMapping): + + def __init__(self, filename=':memory:', flags='r', mode=None): + # XXX add flag/mode handling + # c -- create if it doesn't exist + # n -- new empty + # w -- open existing + # r -- readonly + + MAKE_SHELF = 'CREATE TABLE IF NOT EXISTS shelf (key TEXT NOT NULL, value TEXT NOT NULL)' + MAKE_INDEX = 'CREATE UNIQUE INDEX IF NOT EXISTS keyndx ON shelf (key)' + self.conn = sqlite3.connect(filename) + self.conn.text_factory = bytes + self.conn.execute(MAKE_SHELF) + self.conn.execute(MAKE_INDEX) + self.conn.commit() + + def __len__(self): + GET_LEN = 'SELECT COUNT(*) FROM shelf' + return self.conn.execute(GET_LEN).fetchone()[0] + + def keys(self): + return SQLhashKeysView(self) + + def values(self): + return SQLhashValuesView(self) + + def items(self): + return SQLhashItemsView(self) + + def __iter__(self): + return iter(self.keys()) + + def __contains__(self, key): + GET_ITEM = 'SELECT value FROM shelf WHERE key = ?' + return self.conn.execute(GET_ITEM, (key,)).fetchone() is not None + + def __getitem__(self, key): + GET_ITEM = 'SELECT value FROM shelf WHERE key = ?' + item = self.conn.execute(GET_ITEM, (key,)).fetchone() + if item is None: + raise KeyError(key) + return item[0] + + def __setitem__(self, key, value): + ADD_ITEM = 'REPLACE INTO shelf (key, value) VALUES (?,?)' + self.conn.execute(ADD_ITEM, (key, value)) + self.conn.commit() + + def __delitem__(self, key): + if key not in self: + raise KeyError(key) + DEL_ITEM = 'DELETE FROM shelf WHERE key = ?' + self.conn.execute(DEL_ITEM, (key,)) + self.conn.commit() + + def update(self, items=(), **kwds): + if isinstance(items, collections.Mapping): + items = items.items() + UPDATE_ITEMS = 'REPLACE INTO shelf (key, value) VALUES (?, ?)' + self.conn.executemany(UPDATE_ITEMS, items) + self.conn.commit() + if kwds: + self.update(kwds) + + def clear(self): + CLEAR_ALL = 'DELETE FROM shelf; VACUUM;' + self.conn.executescript(CLEAR_ALL) + self.conn.commit() + + def close(self): + if self.conn is not None: + self.conn.commit() + self.conn.close() + self.conn = None + + def __del__(self): + self.close() + +class ListRepr: + + def __repr__(self): + return repr(list(self)) + +class SQLhashKeysView(collections.KeysView, ListRepr): + + def __iter__(self): + GET_KEYS = 'SELECT key FROM shelf ORDER BY ROWID' + return map(itemgetter(0), self._mapping.conn.cursor().execute(GET_KEYS)) + +class SQLhashValuesView(collections.ValuesView, ListRepr): + + def __iter__(self): + GET_VALUES = 'SELECT value FROM shelf ORDER BY ROWID' + return map(itemgetter(0), self._mapping.conn.cursor().execute(GET_VALUES)) + +class SQLhashItemsView(collections.ValuesView, ListRepr): + + def __iter__(self): + GET_ITEMS = 'SELECT key, value FROM shelf ORDER BY ROWID' + return iter(self._mapping.conn.cursor().execute(GET_ITEMS)) + +def open(file=None, *args): + if file is not None: + return SQLhash(file) + return SQLhash() + + +if __name__ in '__main___': + for d in SQLhash(), SQLhash('example'): + print(list(d), "start") + d['abc'] = 'lmno' + print(d['abc']) + d['abc'] = 'rsvp' + d['xyz'] = 'pdq' + print(d.items()) + print(d.values()) + print(d.keys()) + print(list(d), 'list') + d.update(p='x', q='y', r='z') + print(d.items()) + + del d['abc'] + try: + print(d['abc']) + except KeyError: + pass + else: + raise Exception('oh noooo!') + + try: + del d['abc'] + except KeyError: + pass + else: + raise Exception('drat!') + + print(list(d)) + d.clear() + print(list(d)) + d.update(p='x', q='y', r='z') + print(list(d)) + d['xyz'] = 'pdq' + + print() + d.close() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-12 20:23:22
|
Revision: 1406 http://assorted.svn.sourceforge.net/assorted/?rev=1406&view=rev Author: yangzhang Date: 2009-05-12 20:23:09 +0000 (Tue, 12 May 2009) Log Message: ----------- fixes in upgrade to python 2.6 Modified Paths: -------------- python-commons/trunk/src/commons/log.py python-commons/trunk/src/commons/progress.py python-commons/trunk/src/commons/seqs.py python-commons/trunk/src/commons/structs.py Modified: python-commons/trunk/src/commons/log.py =================================================================== --- python-commons/trunk/src/commons/log.py 2009-05-12 20:17:06 UTC (rev 1405) +++ python-commons/trunk/src/commons/log.py 2009-05-12 20:23:09 UTC (rev 1406) @@ -15,7 +15,7 @@ # TODO cleanup / reorganize this section def fmt( flag, *args ): - return ' '.join( chain( [ '%-20s:' % flag ], imap( str, args ) ) ) + return ' '.join( chain( [ '%-20s:' % flag ], imap( unicode, args ) ) ) def log( level, flag, *args ): getLogger( flag ).log( level, fmt( flag, *args ) ) Modified: python-commons/trunk/src/commons/progress.py =================================================================== --- python-commons/trunk/src/commons/progress.py 2009-05-12 20:17:06 UTC (rev 1405) +++ python-commons/trunk/src/commons/progress.py 2009-05-12 20:23:09 UTC (rev 1406) @@ -5,8 +5,8 @@ Long-running processes and facilities to let them report their progress. """ -from log import * -from seqs import * +from commons.log import * +from commons.seqs import * import urllib2, sys def urlopen_progress( url, output_stream = None, fields = 'OoRrtcl', chunk_size = 4096 ): Modified: python-commons/trunk/src/commons/seqs.py =================================================================== --- python-commons/trunk/src/commons/seqs.py 2009-05-12 20:17:06 UTC (rev 1405) +++ python-commons/trunk/src/commons/seqs.py 2009-05-12 20:23:09 UTC (rev 1406) @@ -32,7 +32,6 @@ pairwise argmax argmin -all concat flatten grouper Modified: python-commons/trunk/src/commons/structs.py =================================================================== --- python-commons/trunk/src/commons/structs.py 2009-05-12 20:17:06 UTC (rev 1405) +++ python-commons/trunk/src/commons/structs.py 2009-05-12 20:23:09 UTC (rev 1406) @@ -19,8 +19,8 @@ def contains_b( self, b ): return b in self.b2a def __len__( self ): return len( self.a2b ) def items( self ): return self.a2b.iteritems() - def as( self ): return self.a2b.keys() - def bs( self ): return self.b2a.keys() + def a_values( self ): return self.a2b.keys() + def b_values( self ): return self.b2a.keys() def clear( self ): self.a2b.clear(); self.b2a.clear() def remove_a( self, a ): b = self.a2b.pop(a) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-12 20:17:27
|
Revision: 1405 http://assorted.svn.sourceforge.net/assorted/?rev=1405&view=rev Author: yangzhang Date: 2009-05-12 20:17:06 +0000 (Tue, 12 May 2009) Log Message: ----------- added sqlalchemy tutorial Modified Paths: -------------- sandbox/trunk/src/py/shelvemem.py Added Paths: ----------- sandbox/trunk/src/py/sqlalch.py Modified: sandbox/trunk/src/py/shelvemem.py =================================================================== --- sandbox/trunk/src/py/shelvemem.py 2009-05-11 21:46:03 UTC (rev 1404) +++ sandbox/trunk/src/py/shelvemem.py 2009-05-12 20:17:06 UTC (rev 1405) @@ -5,10 +5,9 @@ import shelve d = shelve.open('/tmp/shelve', protocol = 2, writeback = True) -print dir(d) for i in xrange(1000000): d[str(i)] = 1000*'A' - if i % 1000 == 0: + if len(d.cache) > 1000: d.sync() with file('/proc/self/status') as f: for line in f: Added: sandbox/trunk/src/py/sqlalch.py =================================================================== --- sandbox/trunk/src/py/sqlalch.py (rev 0) +++ sandbox/trunk/src/py/sqlalch.py 2009-05-12 20:17:06 UTC (rev 1405) @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +from sqlalchemy import * +from sqlalchemy.orm import * +from sqlalchemy.ext.declarative import * + +engine = create_engine('sqlite:///:memory:', echo = True) + +Base = declarative_base() + +class User(Base): + __tablename__ = 'users' + + id = Column(Integer, primary_key = True) + name = Column(String) + fullname = Column(String) + password = Column(String) + + def __init__(self, name, fullname, password): + self.name = name + self.fullname = fullname + self.password = password + + def __repr__(self): + return '<User(%r,%r,%r)>' % (self.name, self.fullname, self.password) + +#users_table = User.__table__ +metadata = Base.metadata +metadata.create_all(engine) + +Session = sessionmaker(bind = engine) +session = Session() + +ed_user = User('ed', 'Ed Jones', 'edpassword') +session.add(ed_user) + + Property changes on: sandbox/trunk/src/py/sqlalch.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-11 21:46:32
|
Revision: 1404 http://assorted.svn.sourceforge.net/assorted/?rev=1404&view=rev Author: yangzhang Date: 2009-05-11 21:46:03 +0000 (Mon, 11 May 2009) Log Message: ----------- fixed some build warnings Modified Paths: -------------- cpp-commons/trunk/src/commons/files.h cpp-commons/trunk/src/commons/versioned_heap.h Modified: cpp-commons/trunk/src/commons/files.h =================================================================== --- cpp-commons/trunk/src/commons/files.h 2009-05-08 10:43:41 UTC (rev 1403) +++ cpp-commons/trunk/src/commons/files.h 2009-05-11 21:46:03 UTC (rev 1404) @@ -108,7 +108,8 @@ closingfd fd(checknnegerr(open(path, O_RDONLY))); check0x(fstat(fd, &sb)); - check(sb.st_size <= 0xffffffff); + // TODO why waaas this herre? + // check(sb.st_size <= 0xffffffff); // TODO Why don't we need (static) cast here? Isn't this a lossy cast? len = sb.st_size; Modified: cpp-commons/trunk/src/commons/versioned_heap.h =================================================================== --- cpp-commons/trunk/src/commons/versioned_heap.h 2009-05-08 10:43:41 UTC (rev 1403) +++ cpp-commons/trunk/src/commons/versioned_heap.h 2009-05-11 21:46:03 UTC (rev 1404) @@ -224,7 +224,7 @@ } char *page; - posix_memalign(reinterpret_cast<void**>(&page), pgsz_, pgsz_); + check0x(posix_memalign(reinterpret_cast<void**>(&page), pgsz_, pgsz_)); hdr &h = *reinterpret_cast<hdr*>(page); h.version = 0; h.index = pages_.size(); @@ -370,8 +370,15 @@ raw_reader r(meta); // Deserialize metadata. - r.read(reinterpret_cast<size_t&>(first_free_)); - r.read(reinterpret_cast<size_t&>(last_free_)); + size_t first_free, last_free; + r.read(first_free); + r.read(last_free); + first_free_ = reinterpret_cast<hdr*>(first_free); + last_free_ = reinterpret_cast<hdr*>(last_free); + // TODO why do next 2 lines this give me this warning? + // error: dereferencing type-punned pointer will break strict-aliasing rules + //r.read(reinterpret_cast<size_t&>(first_free_)); + //r.read(reinterpret_cast<size_t&>(last_free_)); r.read(pgsz_); char *p0 = reinterpret_cast<char*>(data); ind2ptr_mut(first_free_, p0, pgsz_); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-08 10:43:51
|
Revision: 1403 http://assorted.svn.sourceforge.net/assorted/?rev=1403&view=rev Author: yangzhang Date: 2009-05-08 10:43:41 +0000 (Fri, 08 May 2009) Log Message: ----------- added shelvemem Added Paths: ----------- sandbox/trunk/src/py/shelvemem.py Added: sandbox/trunk/src/py/shelvemem.py =================================================================== --- sandbox/trunk/src/py/shelvemem.py (rev 0) +++ sandbox/trunk/src/py/shelvemem.py 2009-05-08 10:43:41 UTC (rev 1403) @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +# Demo of how shelve consumes memory. Answer: they don't! + +import shelve + +d = shelve.open('/tmp/shelve', protocol = 2, writeback = True) +print dir(d) +for i in xrange(1000000): + d[str(i)] = 1000*'A' + if i % 1000 == 0: + d.sync() + with file('/proc/self/status') as f: + for line in f: + if 'VmSize:' in line: + kb = int(line.split()[1]) + break + else: + raise Exception() + print i, kb +d.close() Property changes on: sandbox/trunk/src/py/shelvemem.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |