[pywin32-checkins] pywin32/com/win32comext/bits/test test_bits.py, 1.1.2.1, 1.1.2.2
OLD project page for the Python extensions for Windows
Brought to you by:
mhammond
From: Sidnei da S. <dre...@us...> - 2008-02-06 09:10:23
|
Update of /cvsroot/pywin32/pywin32/com/win32comext/bits/test In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv28263/com/win32comext/bits/test Modified Files: Tag: sidnei-bits test_bits.py Log Message: - A more complete test exercising more of the (implemented) API, now handles errors and uses a callback for printing status, exits when transfer is finished. Index: test_bits.py =================================================================== RCS file: /cvsroot/pywin32/pywin32/com/win32comext/bits/test/Attic/test_bits.py,v retrieving revision 1.1.2.1 retrieving revision 1.1.2.2 diff -C2 -d -r1.1.2.1 -r1.1.2.2 *** test_bits.py 6 Feb 2008 01:51:28 -0000 1.1.2.1 --- test_bits.py 6 Feb 2008 09:10:25 -0000 1.1.2.2 *************** *** 1,6 **** ! import pythoncom, sys, os, time, win32api from win32com.bits import bits ! job_name = 'download-python' bcm = pythoncom.CoCreateInstance(bits.CLSID_BackgroundCopyManager, --- 1,13 ---- ! import win32com.server.policy ! import pythoncom, sys, os, time, win32api, win32event, tempfile from win32com.bits import bits ! TIMEOUT = 200 # ms ! StopEvent = win32event.CreateEvent(None, 0, 0, None) ! ! job_name = 'bits-pywin32-test' ! states = dict([(val, (name.split('_')[-1])) ! for name, val in vars(bits).items() ! if name.startswith('BG_JOB_STATE_')]) bcm = pythoncom.CoCreateInstance(bits.CLSID_BackgroundCopyManager, *************** *** 8,26 **** pythoncom.CLSCTX_LOCAL_SERVER, bits.IID_IBackgroundCopyManager) ! enum = bcm.EnumJobs(0) ! while True: ! jobs = enum.Next() ! if not jobs: ! break ! for job in jobs: ! print job ! job.Cancel() job = bcm.CreateJob(job_name, bits.BG_JOB_TYPE_DOWNLOAD) print job ! job.AddFile(r'http://www.python.org/favicon.ico', r'c:\src\favicon.ico') ! job.Suspend() ! print job.GetState() enum = job.EnumFiles() --- 15,61 ---- pythoncom.CLSCTX_LOCAL_SERVER, bits.IID_IBackgroundCopyManager) ! ! class BackgroundJobCallback(win32com.server.policy.DesignatedWrapPolicy): ! _com_interfaces_ = [bits.IID_IBackgroundCopyCallback] ! _public_methods_ = ["JobTransferred", "JobError", "JobModification"] ! ! def __init__(self): ! self._wrap_(self) ! ! def JobTransferred(self, job): ! print 'Job Transferred', job ! job.Complete() ! win32event.SetEvent(StopEvent) # exit msg pump ! ! def JobError(self, job, error): ! print 'Job Error', job, error ! f = error.GetFile() ! print 'While downloading', f.GetRemoteName() ! print 'To', f.GetLocalName() ! print 'The following error happened:' ! print error.GetErrorDescription(0) ! if f.GetRemoteName().endswith('missing-favicon.ico'): ! print 'Changing to point to correct file' ! f2 = f.QueryInterface(bits.IID_IBackgroundCopyFile2) ! f2.SetRemoteName(r'http://www.python.org/favicon.ico') ! job.Resume() ! else: ! job.Cancel() ! ! def JobModification(self, job, reserved): ! print 'Job Modification', job, states.get(job.GetState()) job = bcm.CreateJob(job_name, bits.BG_JOB_TYPE_DOWNLOAD) print job ! job.SetNotifyInterface(pythoncom.WrapObject(BackgroundJobCallback(), bits.IID_IBackgroundCopyCallback, pythoncom.IID_IUnknown)) ! job.SetNotifyFlags(bits.BG_NOTIFY_JOB_TRANSFERRED | ! bits.BG_NOTIFY_JOB_ERROR | ! bits.BG_NOTIFY_JOB_MODIFICATION) ! ! job.AddFile(r'http://www.python.org/favicon.ico', os.path.join(tempfile.gettempdir(), 'bits-favicon.ico')) ! job.AddFile(r'http://www.non-existing.domain/missing-favicon.ico', os.path.join(tempfile.gettempdir(), 'bits-missing-favicon.ico')) ! ! # job.Suspend() enum = job.EnumFiles() *************** *** 34,47 **** job.Resume() - print job.GetState() ! while job.GetState() != bits.BG_JOB_STATE_TRANSFERRED: ! if job.GetState() in (bits.BG_JOB_STATE_ERROR, ! bits.BG_JOB_STATE_TRANSIENT_ERROR): ! break ! time.sleep(2) ! print job.GetState() ! ! job.Complete() ! import code; code.interact(local=locals()) --- 69,83 ---- job.Resume() ! while True: ! rc = win32event.MsgWaitForMultipleObjects( ! (StopEvent,), ! 0, ! TIMEOUT, ! win32event.QS_ALLEVENTS) ! if rc == win32event.WAIT_OBJECT_0: ! break ! elif rc == win32event.WAIT_OBJECT_0+1: ! if pythoncom.PumpWaitingMessages(): ! break # wm_quit |