From: Alan M. <al...@al...> - 2012-12-10 16:18:19
|
I'm continuing to fight this error. As a sanity check I rewrote my sample app as a single thread only. With interleaved read/writes to multiple tables I still get "RuntimeError: dictionary changed size during iteration" in flush. I still think there is some underlying problem or something I don't understand about pytables/hdf5. I'm far from an expert on either of these so I appreciate any suggestions or even confirmation that I'm not completely crazy? The following code should work, right? import tables import random import datetime # a simple table class TableValue(tables.IsDescription): a = tables.Int64Col(pos=1) b = tables.UInt32Col(pos=2) class Test(): def __init__(self): self.stats = {'read': 0, 'write': 0, 'read_error': 0, 'write_error': 0} self.h5 = None self.h5 = tables.openFile('/data/test.h5', mode='w') self.num_groups = 5 self.num_tables = 5 # create num_groups self.groups = [self.h5.createGroup('/', "group%d"%i) for i in range(self.num_groups)] self.tables = [] # create num_tables in each group we just created for group in self.groups: tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for i in range(self.num_tables)] self.tables.append (tbls) for table in tbls: # add an index for good measure table.cols.a.createIndex() def write(self): # select a random table and write to it x = self.tables[random.randint(0, self.num_groups-1)][random.randint(0, self.num_tables-1)].row x['a'] = random.randint(0, 100) x['b'] = random.randint(0, 100) x.append() self.stats['write'] += 1 def read(self): # first flush any cached data self.h5.flush() # then select a random table table = self.tables[random.randint(0, self.num_groups-1)][random.randint(0, self.num_tables-1)] # and do some random query table.readWhere('a > %d'%(random.randint(0, 100))) self.stats['read'] += 1 def close(self): self.h5.close() def main(): t = Test() start = datetime.datetime.now() # run for 10 seconds while (datetime.datetime.now() - start < datetime.timedelta(seconds=10)): # randomly do a read or a write if random.random() > 0.5: t.write() else: t.read() print t.stats print "Done" t.close() if __name__ == "__main__": main() On Thu, Dec 6, 2012 at 9:55 AM, Alan Marchiori <al...@al...> wrote: > Josh, > > Thanks for the detailed response. I would like to avoid going through a > separate process if at all possible due to the performance penalty. I have > also tried your last suggestion to create a dedicated pytables thread and > send everything through that but still see the same problem (Runtime error > in flush). This leads me to believe something strange is going on behind > the scenes. ?? > > Updated test program with dedicated pytables thread reading an input > Queue.Queue: > > import tables > import threading > import random > import time > import Queue > > # a simple table > class TableValue(tables.IsDescription): > a = tables.Int64Col(pos=1) > b = tables.UInt32Col(pos=2) > > class TablesThread(threading.Thread): > def __init__(self): > threading.Thread.__init__(self) > self.name = 'HDF5 io thread' > # create the dummy HDF5 file > self.h5 = None > self.h5 = tables.openFile('/data/test.h5', mode='w') > self.num_groups = 5 > self.num_tables = 5 > self.groups = [self.h5.createGroup('/', "group%d"%i) for i in > range(self.num_groups)] > self.tables = [] > for group in self.groups: > tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) > for i in range(self.num_tables)] > self.tables.append (tbls) > for table in tbls: > # add an index for good measure > table.cols.a.createIndex() > self.stopEvt = threading.Event() > self.stoppedEvt = threading.Event() > self.inputQ = Queue.Queue() > > def run(self): > try: > while not self.stopEvt.is_set(): > # get a command > try: > cmd, args, result = self.inputQ.get(timeout = 0.5) > except Queue.Empty: > # poll stopEvt so we can shutdown > continue > > # do the command > if cmd == 'write': > x = self.tables[args[0]][args[1]].row > x['a'] = args[2] > x['b'] = args[3] > x.append() > elif cmd == 'read': > self.h5.flush() > table = self.tables[args[0]][args[1]] > result.value = table.readWhere('a > %d'%(args[2])) > else: > raise Exception("Command not supported: %s"%(cmd,)) > > # signal that the result is ready > result.event.set() > > finally: > # shutdown > self.h5.close() > self.stoppedEvt.set() > > def stop(self): > if not self.stoppedEvt.is_set(): > self.stopEvt.set() > self.stoppedEvt.wait() > > class ResultEvent(): > def __init__(self): > self.event = threading.Event() > self.value = None > > class Test(): > def __init__(self): > self.tables = TablesThread() > self.tables.start() > self.timeout = 5 > self.stats = {'read': 0, > 'write': 0, > 'read_error': 0, > 'write_error': 0} > > def write(self): > r = ResultEvent() > self.tables.inputQ.put(('write', > (random.randint(0, > self.tables.num_groups-1), > random.randint(0, > self.tables.num_tables-1), > random.randint(0, 100), > random.randint(0, 100)), > r)) > r.event.wait(timeout = self.timeout) > if r.event.is_set(): > self.stats['write'] += 1 > else: > self.stats['write_error'] += 1 > > def read(self): > r = ResultEvent() > self.tables.inputQ.put(('read', > (random.randint(0, > self.tables.num_groups-1), > random.randint(0, > self.tables.num_tables-1), > random.randint(0, 100)), > r)) > r.event.wait(timeout = self.timeout) > if r.event.is_set(): > self.stats['read'] += 1 > #print 'Query got %d hits'%(len(r.value)) > else: > self.stats['read_error'] += 1 > > > def close(self): > self.tables.stop() > > def __del__(self): > self.close() > > class Worker(threading.Thread): > def __init__(self, method): > threading.Thread.__init__(self) > self.method = method > self.stopEvt = threading.Event() > > def run(self): > while not self.stopEvt.is_set(): > try: > self.method() > except Exception, x: > print 'Worker thread failed with: %s'%(x,) > time.sleep(random.random()/100.0) > > def stop(self): > self.stopEvt.set() > > def main(): > t = Test() > > threads = [Worker(t.write) for _i in range(10)] > threads.extend([Worker(t.read) for _i in range(10)]) > > for thread in threads: > thread.start() > > time.sleep(5) > > for thread in threads: > thread.stop() > > for thread in threads: > thread.join() > > t.close() > > print t.stats > > if __name__ == "__main__": > main() > > > On Wed, Dec 5, 2012 at 10:52 PM, Josh Ayers <jos...@gm...> wrote: > >> Alan, >> >> Unfortunately, the underlying HDF5 library isn't thread-safe by default. >> It can be built in a thread-safe mode that serializes all API calls, but >> still doesn't allow actual parallel access to the disk. See [1] for more >> details. Here's [2] another interesting discussion concerning whether >> multithreaded access is actually beneficial for an I/O limited library like >> HDF5. Ultimately, if one thread can read at the disk's maximum transfer >> rate, then multiple threads don't provide any benefit. >> >> Beyond the limitations of HDF5, PyTables also maintains global state in >> various module-level variables. One example is the _open_file cache in the >> file.py module. I made an attempt in the past to work around this to allow >> read-only access from multiple threads, but didn't make much progress. >> >> In general, I think your best bet is to serialize all access through a >> single process. There is another example in the PyTables/examples >> directory that benchmarks different methods of transferring data from >> PyTables to another process [3]. It compares Python's >> multiprocessing.Queue, sockets, and memory-mapped files. In my testing, >> the latter two are 5-10x faster than using a queue. >> >> Another option would be to use multiple threads, but handle all access to >> the HDF5 file in one thread. PyTables will release the GIL when making >> HDF5 library calls, so the other threads will be able to run. You could >> use a Queue.Queue or some other mechanism to transfer data between >> threads. No actual copying would be needed since their memory is shared, >> which should make it faster than the multi-process techniques. >> >> Hope that helps. >> >> Josh Ayers >> >> >> [1]: http://www.hdfgroup.org/hdf5-quest.html#mthread >> >> [2]: >> https://visitbugs.ornl.gov/projects/8/wiki/Multi-threaded_cores_and_HPC-HDF5 >> >> [3]: >> https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_benchmarks.py >> >> >> On Wed, Dec 5, 2012 at 2:24 PM, Alan Marchiori <al...@al...>wrote: >> >>> I am trying to allow multiple threads read/write access to pytables data >>> and found it is necessary to call flush() before any read. If not, the >>> latest data is not returned. However, this can cause a RuntimeError. I >>> have tried protecting pytables access with both locks and queues as done by >>> joshayers ( >>> https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_queues.py). >>> In either case I still get RuntimeError: dictionary changed size during >>> iteration when doing the flush. (incidentally using the Locks appears to >>> be much faster than using queues in my unscientific tests...) >>> >>> I have tried versions 2.4 and 2.3.1 with the same results. >>> Interestingly this only appears to happen if there are multiple >>> tables/groups in the H5 file. To investigate this behavior further I >>> create a test program to illustrate (below). When run with num_groups = >>> 5 num_tables = 5 (or greater) I see the runtime error every time. When >>> these values are smaller than this it doesn't (at least in a short test >>> period). >>> >>> I might be doing something unexpected with pytables, but this seems >>> pretty straight forward to me. Any help is appreciated. >>> >>> >>> |