From: Alan M. <al...@al...> - 2012-12-05 22:24:14
|
I am trying to allow multiple threads read/write access to pytables data and found it is necessary to call flush() before any read. If not, the latest data is not returned. However, this can cause a RuntimeError. I have tried protecting pytables access with both locks and queues as done by joshayers ( https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_queues.py). In either case I still get RuntimeError: dictionary changed size during iteration when doing the flush. (incidentally using the Locks appears to be much faster than using queues in my unscientific tests...) I have tried versions 2.4 and 2.3.1 with the same results. Interestingly this only appears to happen if there are multiple tables/groups in the H5 file. To investigate this behavior further I create a test program to illustrate (below). When run with num_groups = 5 num_tables = 5 (or greater) I see the runtime error every time. When these values are smaller than this it doesn't (at least in a short test period). I might be doing something unexpected with pytables, but this seems pretty straight forward to me. Any help is appreciated. import tables import threading import random import time lock = threading.Lock() def synchronized(lock): def wrap(f): def newFunction(*args, **kw): lock.acquire() try: return f(*args, **kw) finally: lock.release() return newFunction return wrap class TableValue(tables.IsDescription): a = tables.Int64Col(pos=1) b = tables.UInt32Col(pos=2) class Test(): def __init__(self): self.h5 = None self.h5 = tables.openFile('/data/test.h5', mode='w') self.num_groups = 5 self.num_tables = 5 self.groups = [self.h5.createGroup('/', "group%d"%i) for i in range(self.num_groups)] self.tables = [] for group in self.groups: tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for i in range(self.num_tables)] self.tables.append (tbls) for table in tbls: table.cols.a.createIndex() self.stats = {'read': 0, 'write': 0} @synchronized(lock) def __del__(self): if self.h5 != None: self.h5.close() self.h5 = None @synchronized(lock) def write(self): x = self.tables[random.randint(0, self.num_groups-1)][random.randint(0, self.num_tables-1)].row x['a'] = random.randint(0, 100) x['b'] = random.randint(0, 100) x.append() self.stats['write'] += 1 @synchronized(lock) def read(self): # flush so we can query latest data self.h5.flush() table = self.tables[random.randint(0, self.num_groups-1)][random.randint(0, self.num_tables-1)] # do some query results = table.readWhere('a > %d'%(random.randint(0, 100))) #print 'Query got %d hits'%(len(results)) self.stats['read'] += 1 class Worker(threading.Thread): def __init__(self, method): threading.Thread.__init__(self) self.method = method self.stopEvt = threading.Event() def run(self): while not self.stopEvt.is_set(): self.method() time.sleep(random.random()/100.0) def stop(self): self.stopEvt.set() def main(): t = Test() threads = [Worker(t.write) for _i in range(10)] threads.extend([Worker(t.read) for _i in range(10)]) for thread in threads: thread.start() time.sleep(5) for thread in threads: thread.stop() for thread in threads: thread.join() print t.stats if __name__ == "__main__": main() |