Update of /cvsroot/pywin32/pywin32/com/win32com/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21864
Modified Files:
testMarshal.py testall.py
Log Message:
Ressurect old stale test
Index: testall.py
===================================================================
RCS file: /cvsroot/pywin32/pywin32/com/win32com/test/testall.py,v
retrieving revision 1.25
retrieving revision 1.26
diff -C2 -d -r1.25 -r1.26
*** testall.py 11 Oct 2004 06:46:14 -0000 1.25
--- testall.py 31 May 2005 09:28:16 -0000 1.26
***************
*** 77,81 ****
testAXScript testxslt testDictionary testCollections
testServers errorSemantics.test testvb testArrays
! testClipboard
""".split(),
# Level 2 tests.
--- 77,81 ----
testAXScript testxslt testDictionary testCollections
testServers errorSemantics.test testvb testArrays
! testClipboard testMarshal
""".split(),
# Level 2 tests.
Index: testMarshal.py
===================================================================
RCS file: /cvsroot/pywin32/pywin32/com/win32com/test/testMarshal.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -C2 -d -r1.2 -r1.3
*** testMarshal.py 18 Nov 2002 11:20:07 -0000 1.2
--- testMarshal.py 31 May 2005 09:28:16 -0000 1.3
***************
*** 26,42 ****
import win32event, win32api
import pythoncom
! from testservers import TestInterp
freeThreaded = 1
! def TestInterpInThread(stopEvent, interp):
! try:
! DoTestInterpInThread(interp)
! finally:
! win32event.SetEvent(stopEvent)
! def DoTestInterpInThread(interp):
! try:
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
--- 26,43 ----
import win32event, win32api
import pythoncom
+ import unittest
! from testServers import InterpCase
freeThreaded = 1
! class ThreadInterpCase(InterpCase):
! def _testInterpInThread(self, stopEvent, interp):
! try:
! self._doTestInThread(interp)
! finally:
! win32event.SetEvent(stopEvent)
! def _doTestInThread(self, interp):
pythoncom.CoInitialize()
myThread = win32api.GetCurrentThreadId()
***************
*** 46,122 ****
interp = win32com.client.Dispatch(interp)
- TestInterp(interp)
interp.Exec("import win32api")
! print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
interp = None
pythoncom.CoUninitialize()
- except:
- traceback.print_exc()
! def BeginThreadsSimpleMarshal(numThreads):
! """Creates multiple threads using simple (but slower) marshalling.
!
! Single interpreter object, but a new stream is created per thread.
!
! Returns the handles the threads will set when complete.
! """
! interp = win32com.client.Dispatch("Python.Interpreter")
! ret = []
! for i in range(numThreads):
! hEvent = win32event.CreateEvent(None, 0, 0, None)
! interpStream = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
! thread.start_new(TestInterpInThread, (hEvent, interpStream))
! ret.append(hEvent)
! return ret
! #
! # NOTE - this doesnt quite work - Im not even sure it should, but Greg reckons
! # you should be able to avoid the marshal per thread!
! def BeginThreadsFastMarshal(numThreads):
! """Creates multiple threads using fast (but complex) marshalling.
! The marshal stream is created once, and each thread uses the same stream
! Returns the handles the threads will set when complete.
! """
! interp = win32com.client.Dispatch("Python.Interpreter")
! if freeThreaded:
! interp = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
! ret = []
! for i in range(numThreads):
! hEvent = win32event.CreateEvent(None, 0, 0, None)
! thread.start_new(TestInterpInThread, (hEvent, interp))
! ret.append(hEvent)
! return ret
! def test(fn):
! print "The main thread is %d" % (win32api.GetCurrentThreadId())
! events = fn(2)
! numFinished = 0
! while 1:
! try:
! # Specifying "bWaitAll" here seems to prevent messages???
! rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
! if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
! numFinished = numFinished + 1
! if numFinished >= len(events):
! break
! elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
! # This is critical - whole apartment model demo will hang.
! pythoncom.PumpWaitingMessages()
! else: # Timeout
! print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
! except KeyboardInterrupt:
! break
if __name__=='__main__':
! test(BeginThreadsSimpleMarshal)
! # test(BeginThreadsFastMarshal)
! win32api.Sleep(500)
! # Doing CoUninit here stop Pythoncom.dll hanging when DLLMain shuts-down the process
! pythoncom.CoUninitialize()
! if pythoncom._GetInterfaceCount()!=0 or pythoncom._GetGatewayCount()!=0:
! print "Done with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
! else:
! print "Done."
--- 47,125 ----
interp = win32com.client.Dispatch(interp)
interp.Exec("import win32api")
! #print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))
interp = None
pythoncom.CoUninitialize()
! def BeginThreadsSimpleMarshal(self, numThreads):
! """Creates multiple threads using simple (but slower) marshalling.
!
! Single interpreter object, but a new stream is created per thread.
!
! Returns the handles the threads will set when complete.
! """
! interp = win32com.client.Dispatch("Python.Interpreter")
! ret = []
! for i in range(numThreads):
! hEvent = win32event.CreateEvent(None, 0, 0, None)
! interpStream = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
! thread.start_new(self._testInterpInThread, (hEvent, interpStream))
! ret.append(hEvent)
! return ret
! #
! # NOTE - this doesnt quite work - Im not even sure it should, but Greg reckons
! # you should be able to avoid the marshal per thread!
! def BeginThreadsFastMarshal(self, numThreads):
! """Creates multiple threads using fast (but complex) marshalling.
!
! The marshal stream is created once, and each thread uses the same stream
!
! Returns the handles the threads will set when complete.
! """
! interp = win32com.client.Dispatch("Python.Interpreter")
! if freeThreaded:
! interp = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
! ret = []
!
! for i in range(numThreads):
! hEvent = win32event.CreateEvent(None, 0, 0, None)
! thread.start_new(self._testInterpInThread, (hEvent, interp))
! ret.append(hEvent)
! return ret
! def _DoTestMarshal(self, fn, bCoWait = 0):
! #print "The main thread is %d" % (win32api.GetCurrentThreadId())
! events = fn(2)
! numFinished = 0
! while 1:
! try:
! if bCoWait:
! rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
! else:
! # Specifying "bWaitAll" here will wait for messages *and* all events
! # (which is pretty useless)
! rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
! if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events):
! numFinished = numFinished + 1
! if numFinished >= len(events):
! break
! elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message
! # This is critical - whole apartment model demo will hang.
! pythoncom.PumpWaitingMessages()
! else: # Timeout
! print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
! except KeyboardInterrupt:
! break
! def testSimpleMarshal(self):
! self._DoTestMarshal(self.BeginThreadsSimpleMarshal)
! def testSimpleMarshalCoWait(self):
! self._DoTestMarshal(self.BeginThreadsSimpleMarshal, 1)
! # def testFastMarshal(self):
! # self._DoTestMarshal(self.BeginThreadsFastMarshal)
if __name__=='__main__':
! unittest.main('testMarshal')
|