Thread: [cx-oracle-users] passing a connection to a process?
Brought to you by:
atuining
From: Chuck W. <chu...@ch...> - 2010-06-20 04:22:42
|
Hello -- I am trying to export data from an Oracle 10 db (~1TB) to a Netezza db. The ORCL db has >150tables with tables which range from a few kb to 300GB. I am using Python 2.6.5/cx_Oracle to download the data as bzipped csv files and then loading in Netezza. I have written a script which spawns as many processes as specified (using threading, Queue and subprocess modules) and pulls down the data. The large tables are pulled down as multiple files, each of which is a partition. Briefly, here's the code which spawns the processes. ==== jobq = Queue.Queue(...) put stuff in the queue def worker(): while True: j = jobq.get() p = subprocess.Popen(command+" --job=%s"%j, stdout=subprocess.PIPE, shell=True) p.wait() jobq.task_done() for i in xrange(int(options.nproc)): t = threading.Thread(target=worker) t.setDaemon(True) t.start() jobq.join() ==== Since, I am spawning multiple processes, and cx_Oracle connection object is not picklable, each process has to start and close its own connection (adds about 15seconds to each process). While that is fine for the larger tables, it adds a lot of time for the 100 or so small tables. Is there any way in which all the connections can be started by the master and passed to the worker processes? I realize I can easily spawn multiple threads and just use one process. However, with all the compression that needs to happen, I am thinking that would be much slower. Thank you. |
From: Amaury F. d'A. <ama...@gm...> - 2010-06-20 17:47:48
|
Hi, 2010/6/20 Chuck White <chu...@ch...>: > Hello -- I am trying to export data from an Oracle 10 db (~1TB) to a Netezza db. The ORCL db has >150tables with tables which range from a few kb to 300GB. > > I am using Python 2.6.5/cx_Oracle to download the data as bzipped csv files and then loading in Netezza. I have written a script which spawns as many processes as specified (using threading, Queue and subprocess modules) and pulls down the data. The large tables are pulled down as multiple files, each of which is a partition. Briefly, here's the code which spawns the processes. > > ==== > jobq = Queue.Queue(...) > put stuff in the queue > > def worker(): > while True: > j = jobq.get() > p = subprocess.Popen(command+" --job=%s"%j, stdout=subprocess.PIPE, shell=True) > p.wait() > jobq.task_done() > > for i in xrange(int(options.nproc)): > t = threading.Thread(target=worker) > t.setDaemon(True) > t.start() > > jobq.join() > ==== > > Since, I am spawning multiple processes, and cx_Oracle connection object is not picklable, each process has to start and close its own connection (adds about 15seconds to each process). While that is fine for the larger tables, it adds a lot of time for the 100 or so small tables. > > Is there any way in which all the connections can be started by the master and passed to the worker processes? I realize I can easily spawn multiple threads and just use one process. However, with all the compression that needs to happen, I am thinking that would be much slower. Did you actually try? The compression functions can truly run concurrently in several threads, since they care to release the Python "Global Interpreter Lock". Another solution is to use the "multiprocessing" module: a multiprocessing.Pool keeps the spawned processes alive between jobs, and you can cache the connection in a global variable. -- Amaury Forgeot d'Arc |
From: Chuck W. <chu...@ch...> - 2010-06-20 18:47:22
|
Thanks for your response. ---- Amaury Forgeot d'Arc <ama...@gm...> wrote: > Another solution is to use the "multiprocessing" module: a > multiprocessing.Pool keeps the spawned processes alive between jobs, > and you can cache the connection in a global variable. Can you please post an example with the multiprocessing module. My understanding is that objects have to be picklable to spawn processes using multi-processing. Maybe I am missing something here. > Did you actually try? The compression functions can truly run > concurrently in several threads, since > they care to release the Python "Global Interpreter Lock". No, I did not try. Thanks. |
From: Amaury F. d'A. <ama...@gm...> - 2010-06-20 20:40:32
|
2010/6/20 Chuck White <chu...@ch...>: > Thanks for your response. > > Can you please post an example with the multiprocessing module. My understanding is that objects have to be picklable to spawn processes using multi-processing. Maybe I am missing something here. The parameters have to be picklable, yes. But not the state of the worker process... In the following sample, the "connection" object would be a cx_Oracle connection. The important thing is that it's a global variable, so reused the next time the worker process gets a new task. connection = None def worker(param): import time time.sleep(0.1) global connection if connection is None: print "%d Connect" % param connection = param else: print "%d Connected by %d" % (param, connection) return param + 10 if __name__ == '__main__': import multiprocessing pool = multiprocessing.Pool(processes=4) results = [] for x in range(10): def callback(result, x=x): results.append((x, result)) pool.apply_async(worker, [x], callback=callback) pool.close() pool.join() print "results", results -- Amaury Forgeot d'Arc |
From: Chuck W. <chu...@ch...> - 2010-06-21 14:29:37
|
Thank you for sending me the example. Worked nicely. If you don't mind, I do have a couple of questions about the example: 1. when can I close the connection? do I have to send a decoy job which is identified by the worker which in turn closes the connection? Or, is there a better way? 2. I see that multiprocessing has support for logging. Is it possible to instantiate logging in the main code, and have the workers write to it? Thanks again. ---- Amaury Forgeot d'Arc <ama...@gm...> wrote: > 2010/6/20 Chuck White <chu...@ch...>: > > Thanks for your response. > > > > Can you please post an example with the multiprocessing module. My understanding is that objects have to be picklable to spawn processes using multi-processing. Maybe I am missing something here. > > The parameters have to be picklable, yes. But not the state of the > worker process... > In the following sample, the "connection" object would be a cx_Oracle > connection. > The important thing is that it's a global variable, so reused the next > time the worker process gets a new task. |