[Assorted-commits] SF.net SVN: assorted: [240] python-afx/trunk/src/afx/pubsub.py
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-01-18 22:28:59
|
Revision: 240 http://assorted.svn.sourceforge.net/assorted/?rev=240&view=rev Author: yangzhang Date: 2008-01-18 14:28:57 -0800 (Fri, 18 Jan 2008) Log Message: ----------- - added result to condvar.wait - fixed tag() - flexible pickler - fixed read_pickle - touchups Modified Paths: -------------- python-afx/trunk/src/afx/pubsub.py Modified: python-afx/trunk/src/afx/pubsub.py =================================================================== --- python-afx/trunk/src/afx/pubsub.py 2008-01-18 09:01:43 UTC (rev 239) +++ python-afx/trunk/src/afx/pubsub.py 2008-01-18 22:28:57 UTC (rev 240) @@ -8,9 +8,9 @@ from cStringIO import StringIO from itertools import chain from struct import unpack -from cPickle import dumps, load +from cPickle import load from commons.log import warning -from commons.seqs import streamlen, safe_pickle +from commons.seqs import streamlen, safe_pickler import af stop = 'stop' @@ -53,10 +53,11 @@ self.waiters = [] self.token = token @af.task - def wait( self ): + def wait( self, result = None ): c = af.channel( 1 ) self.waiters.append( c ) yield c.get() + yield af.result( result ) def notify_one( self ): if len( self.waiters ) > 0: res = self.waiters.pop().try_put( self.token ) @@ -261,42 +262,56 @@ @af.task def tag( task, attach ): result = yield task - yield ( attach, result ) + yield af.result( ( attach, result ) ) -@af.task -def write_pickle( socket, obj ): - yield socket.write( safe_pickle( obj ) ) +class cell( object ): + """ + Holds a single value, and replaces it on each put. + Puts are also non-yielding. + """ + def __init__( self ): + self.q = af.channel( 1 ) + def put( self, x ): + deplete( self.q ) + res = self.q.try_put( x ) + assert res + @af.task + def get( self ): + yield af.result( ( yield self.q.get() ) ) @af.task -def read_pickle( socket, init = '', length_thresh = 100000 ): - with closing( StringIO() ) as stream: +def read_pickle( read, init = '', length_thresh = 100000 ): + with closing( StringIO() ) as sio: obj = None # return this if we hit eof (not enough bytes read) + sio.write( init ) @af.task def read_until(target): - remain = target - streamlen(stream) + remain = target - streamlen(sio) if remain > 0: - chunk = yield socket.read( remain ) + chunk = yield read( remain ) # append to end - stream.seek(0,2) - stream.write( chunk ) - yield af.result( stream.tell() >= target ) + sio.seek(0,2) + sio.write( chunk ) + offset = streamlen(sio) + sio.seek(0) + yield af.result( offset >= target ) if ( yield read_until(4) ): - stream.seek(0) - lengthstr = stream.read(4) + lengthstr = sio.read(4) (length,) = unpack('i4', lengthstr) - if length_thresh is not None and length > length_thresh or length <= 0: + if length_thresh is not None and length > length_thresh or \ + length <= 0: warning( 'read_pickle', 'got length', length, - 'streamlen', streamlen(stream), + 'streamlen', streamlen(sio), 'first bytes %x %x %x %x' % tuple(map(ord,lengthstr)) ) if ( yield read_until(length + 4) ): # start reading from right after header - stream.seek(4) - obj = load( stream ) + sio.seek(4) + obj = load(sio) - yield af.result( ( obj, stream.read() ) ) + yield af.result( ( obj, sio.read() ) ) #### stream.write( init ) #### while True: @@ -309,33 +324,22 @@ #### else: #### yield af.result( ( obj, stream.read() ) ) -class cell( object ): - """ - Holds a single value, and replaces it on each put. - Puts are also non-yielding. - """ - def __init__( self ): - self.q = af.channel( 1 ) - def put( self, x ): - deplete( self.q ) - res = self.q.try_put( x ) - assert res - @af.task - def get( self ): - yield af.result( ( yield self.q.get() ) ) - class socket_unpickler( object ): """ Pickle objects directly to a socket stream. """ + def __init__( self, s ): self.s = s self.buffer = '' + @af.task def read( self ): - obj, self.buffer = yield read_pickle( self.s, self.buffer ) + obj, self.buffer = yield read_pickle( self.s.read, self.buffer ) yield af.result( obj ) + + class socket_line_reader( object ): """ A line-reading interface to socket streams. @@ -343,6 +347,7 @@ def __init__( self, s ): self.s = s self.buffer = [] + @af.task def read_line( self ): """ @@ -362,6 +367,7 @@ yield af.result( line ) else: self.buffer.append( chunk[ start : ] ) + @af.task def read_some_lines( self ): """ @@ -381,15 +387,17 @@ else: self.buffer.append( chunk ) + + @af.task def get_traceback(): - import sys - try: raise Exception() - except: yield af.result( sys.exc_info()[2] ) + import sys + try: raise Exception() + except: yield af.result( sys.exc_info()[2] ) @af.task def get_tb_string(): - import traceback - tb = yield get_traceback() - yield af.result( '\n'.join( traceback.format_tb( tb ) ) ) + import traceback + tb = yield get_traceback() + yield af.result( '\n'.join( traceback.format_tb( tb ) ) ) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |