[pywin32-checkins] pywin32/com/win32com/test testMarshal.py,1.3,1.4
OLD project page for the Python extensions for Windows
Brought to you by:
mhammond
From: Mark H. <mha...@us...> - 2005-05-31 12:32:05
|
Update of /cvsroot/pywin32/pywin32/com/win32com/test In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv23130 Modified Files: testMarshal.py Log Message: Convert to threading module and wait for threads to stop Index: testMarshal.py =================================================================== RCS file: /cvsroot/pywin32/pywin32/com/win32com/test/testMarshal.py,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** testMarshal.py 31 May 2005 09:28:16 -0000 1.3 --- testMarshal.py 31 May 2005 12:31:50 -0000 1.4 *************** *** 22,26 **** """ ! import thread, traceback import win32com.client import win32event, win32api --- 22,26 ---- """ ! import threading, traceback import win32com.client import win32event, win32api *************** *** 49,53 **** interp.Exec("import win32api") #print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()")) - interp = None pythoncom.CoUninitialize() --- 49,52 ---- *************** *** 60,74 **** """ interp = win32com.client.Dispatch("Python.Interpreter") ! ret = [] for i in range(numThreads): hEvent = win32event.CreateEvent(None, 0, 0, None) interpStream = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_) ! thread.start_new(self._testInterpInThread, (hEvent, interpStream)) ! ret.append(hEvent) ! return ret # # NOTE - this doesnt quite work - Im not even sure it should, but Greg reckons # you should be able to avoid the marshal per thread! def BeginThreadsFastMarshal(self, numThreads): """Creates multiple threads using fast (but complex) marshalling. --- 59,79 ---- """ interp = win32com.client.Dispatch("Python.Interpreter") ! events = [] ! threads = [] for i in range(numThreads): hEvent = win32event.CreateEvent(None, 0, 0, None) + events.append(hEvent) interpStream = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_) ! t = threading.Thread(target=self._testInterpInThread, args=(hEvent, interpStream)) ! t.setDaemon(1) # so errors dont cause shutdown hang ! t.start() ! threads.append(t) ! interp = None ! return threads, events # # NOTE - this doesnt quite work - Im not even sure it should, but Greg reckons # you should be able to avoid the marshal per thread! + # I think that refers to CoMarshalInterface though... def BeginThreadsFastMarshal(self, numThreads): """Creates multiple threads using fast (but complex) marshalling. *************** *** 81,95 **** if freeThreaded: interp = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_) ! ret = [] ! for i in range(numThreads): hEvent = win32event.CreateEvent(None, 0, 0, None) ! thread.start_new(self._testInterpInThread, (hEvent, interp)) ! ret.append(hEvent) ! return ret def _DoTestMarshal(self, fn, bCoWait = 0): #print "The main thread is %d" % (win32api.GetCurrentThreadId()) ! events = fn(2) numFinished = 0 while 1: --- 86,103 ---- if freeThreaded: interp = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_) ! events = [] ! threads = [] for i in range(numThreads): hEvent = win32event.CreateEvent(None, 0, 0, None) ! t = threading.Thread(target=self._testInterpInThread, args=(hEvent, interp)) ! t.setDaemon(1) # so errors dont cause shutdown hang ! t.start() ! events.append(hEvent) ! threads.append(t) ! return threads, events def _DoTestMarshal(self, fn, bCoWait = 0): #print "The main thread is %d" % (win32api.GetCurrentThreadId()) ! threads, events = fn(2) numFinished = 0 while 1: *************** *** 112,115 **** --- 120,130 ---- except KeyboardInterrupt: break + for t in threads: + t.join(2) + self.failIf(t.isAlive(), "thread failed to stop!?") + threads = None # threads hold references to args + # Seems to be a leak here I can't locate :( + #self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0) + #self.failUnlessEqual(pythoncom._GetGatewayCount(), 0) def testSimpleMarshal(self): |