Update of /cvsroot/pywin32/pywin32/win32/test
In directory ddv4jf1.ch3.sourceforge.com:/tmp/cvs-serv18841/win32/test
Modified Files:
Tag: py3k
test_win32api.py test_win32file.py
Log Message:
merge datetime changes from trunk (and other misc trunk changes)
Index: test_win32file.py
===================================================================
RCS file: /cvsroot/pywin32/pywin32/win32/test/test_win32file.py,v
retrieving revision 1.13.2.7
retrieving revision 1.13.2.8
diff -C2 -d -r1.13.2.7 -r1.13.2.8
*** test_win32file.py 14 Jan 2009 12:42:03 -0000 1.13.2.7
--- test_win32file.py 27 Jan 2009 07:55:32 -0000 1.13.2.8
***************
*** 3,6 ****
--- 3,7 ----
import win32api, win32file, win32pipe, pywintypes, winerror, win32event
import win32con, ntsecuritycon
+ import win32timezone
import sys
import os
***************
*** 11,14 ****
--- 12,16 ----
import socket
import datetime
+ import random
try:
***************
*** 121,128 ****
os.unlink(filename)
def testFileTimes(self):
if issubclass(pywintypes.TimeType, datetime.datetime):
! from win32timezone import GetLocalTimeZone
! now = datetime.datetime.now(tz=GetLocalTimeZone())
nowish = now + datetime.timedelta(seconds=1)
later = now + datetime.timedelta(seconds=120)
--- 123,157 ----
os.unlink(filename)
+ def testFileTimesTimezones(self):
+ if not issubclass(pywintypes.TimeType, datetime.datetime):
+ # maybe should report 'skipped', but that's not quite right as
+ # there is nothing you can do to avoid it being skipped!
+ return
+ filename = tempfile.mktemp("-testFileTimes")
+ now_utc = win32timezone.utcnow()
+ now_local = now_utc.astimezone(win32timezone.TimeZoneInfo.local())
+ h = win32file.CreateFile(filename,
+ win32file.GENERIC_READ|win32file.GENERIC_WRITE,
+ 0, None, win32file.CREATE_ALWAYS, 0, 0)
+ try:
+ win32file.SetFileTime(h, now_utc, now_utc, now_utc)
+ ct, at, wt = win32file.GetFileTime(h)
+ self.failUnlessEqual(now_local, ct)
+ self.failUnlessEqual(now_local, at)
+ self.failUnlessEqual(now_local, wt)
+ # and the reverse - set local, check against utc
+ win32file.SetFileTime(h, now_local, now_local, now_local)
+ ct, at, wt = win32file.GetFileTime(h)
+ self.failUnlessEqual(now_utc, ct)
+ self.failUnlessEqual(now_utc, at)
+ self.failUnlessEqual(now_utc, wt)
+ finally:
+ h.close()
+ os.unlink(filename)
+
def testFileTimes(self):
if issubclass(pywintypes.TimeType, datetime.datetime):
! from win32timezone import TimeZoneInfo
! now = datetime.datetime.now(tz=TimeZoneInfo.local())
nowish = now + datetime.timedelta(seconds=1)
later = now + datetime.timedelta(seconds=120)
***************
*** 297,301 ****
raise
finally:
! handle.Close()
t.join(3)
self.failIf(t.isAlive(), "thread didn't finish")
--- 326,331 ----
raise
finally:
! if not test_overlapped_death:
! handle.Close()
t.join(3)
self.failIf(t.isAlive(), "thread didn't finish")
***************
*** 556,559 ****
--- 586,708 ----
os.unlink(fname)
+ class TestConnect(unittest.TestCase):
+ def test_connect_with_payload(self):
+ def runner():
+ s1 = socket.socket()
+ self.addr = ('localhost', random.randint(10000,64000))
+ s1.bind(self.addr)
+ s1.listen(1)
+ cli, addr = s1.accept()
+ self.request = cli.recv(1024)
+ cli.send(str2bytes('some expected response'))
+ t = threading.Thread(target=runner)
+ t.start()
+ time.sleep(0.1)
+ s2 = socket.socket()
+ ol = pywintypes.OVERLAPPED()
+ s2.bind(('0.0.0.0', 0)) # connectex requires the socket be bound beforehand
+ win32file.ConnectEx(s2, self.addr, ol, str2bytes("some expected request"))
+ win32file.GetOverlappedResult(s2.fileno(), ol, 1)
+ ol = pywintypes.OVERLAPPED()
+ buff = win32file.AllocateReadBuffer(1024)
+ win32file.WSARecv(s2, buff, ol, 0)
+ length = win32file.GetOverlappedResult(s2.fileno(), ol, 1)
+ self.response = buff[:length]
+ self.assertEqual(self.response, str2bytes('some expected response'))
+ self.assertEqual(self.request, str2bytes('some expected request'))
+ t.join(5)
+ self.failIf(t.isAlive(), "worker thread didn't terminate")
+
+ def test_connect_without_payload(self):
+ def runner():
+ s1 = socket.socket()
+ self.addr = ('localhost', random.randint(10000,64000))
+ s1.bind(self.addr)
+ s1.listen(1)
+ cli, addr = s1.accept()
+ cli.send(str2bytes('some expected response'))
+ t = threading.Thread(target=runner)
+ t.start()
+ time.sleep(0.1)
+ s2 = socket.socket()
+ ol = pywintypes.OVERLAPPED()
+ s2.bind(('0.0.0.0', 0)) # connectex requires the socket be bound beforehand
+ win32file.ConnectEx(s2, self.addr, ol)
+ win32file.GetOverlappedResult(s2.fileno(), ol, 1)
+ ol = pywintypes.OVERLAPPED()
+ buff = win32file.AllocateReadBuffer(1024)
+ win32file.WSARecv(s2, buff, ol, 0)
+ length = win32file.GetOverlappedResult(s2.fileno(), ol, 1)
+ self.response = buff[:length]
+ self.assertEqual(self.response, str2bytes('some expected response'))
+ t.join(5)
+ self.failIf(t.isAlive(), "worker thread didn't terminate")
+
+ class TestTransmit(unittest.TestCase):
+ def test_transmit(self):
+ import binascii
+ val = binascii.hexlify(os.urandom(1024*1024))
+ val_length = len(val)
+ f = tempfile.TemporaryFile()
+ f.write(val)
+
+ def runner():
+ s1 = socket.socket()
+ self.addr = ('localhost', random.randint(10000,64000))
+ s1.bind(self.addr)
+ s1.listen(1)
+ cli, addr = s1.accept()
+ buf = 1
+ self.request = []
+ while buf:
+ buf = cli.recv(1024*100)
+ self.request.append(buf)
+
+ th = threading.Thread(target=runner)
+ th.start()
+ time.sleep(0.5)
+ s2 = socket.socket()
+ s2.connect(self.addr)
+
+ length = 0
+ aaa = str2bytes("[AAA]")
+ bbb = str2bytes("[BBB]")
+ ccc = str2bytes("[CCC]")
+ ddd = str2bytes("[DDD]")
+ empty = str2bytes("")
+ ol = pywintypes.OVERLAPPED()
+ f.seek(0)
+ win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0)
+ length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
+
+ ol = pywintypes.OVERLAPPED()
+ f.seek(0)
+ win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0, aaa, bbb)
+ length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
+
+ ol = pywintypes.OVERLAPPED()
+ f.seek(0)
+ win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0, empty, empty)
+ length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
+
+ ol = pywintypes.OVERLAPPED()
+ f.seek(0)
+ win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0, None, ccc)
+ length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
+
+ ol = pywintypes.OVERLAPPED()
+ f.seek(0)
+ win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0, ddd)
+ length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
+
+ s2.close()
+ th.join()
+ buf = str2bytes('').join(self.request)
+ self.assertEqual(length, len(buf))
+ expected = val + aaa + val + bbb + val + val + ccc + ddd + val
+ self.assertEqual(type(expected), type(buf))
+ self.assert_(expected == buf)
+
+
if __name__ == '__main__':
unittest.main()
Index: test_win32api.py
===================================================================
RCS file: /cvsroot/pywin32/pywin32/win32/test/test_win32api.py,v
retrieving revision 1.13.2.4
retrieving revision 1.13.2.5
diff -C2 -d -r1.13.2.4 -r1.13.2.5
*** test_win32api.py 7 Jan 2009 06:25:16 -0000 1.13.2.4
--- test_win32api.py 27 Jan 2009 07:55:32 -0000 1.13.2.5
***************
*** 7,10 ****
--- 7,11 ----
import sys, os
import tempfile
+ import datetime
class CurrentUserTestCase(unittest.TestCase):
***************
*** 25,29 ****
# for the sake of code exercise but don't output
tz_str.encode()
! tz_time.Format()
def TestDateFormat(self):
DATE_LONGDATE = 2
--- 26,31 ----
# for the sake of code exercise but don't output
tz_str.encode()
! if not isinstance(tz_time, datetime.datetime):
! tz_time.Format()
def TestDateFormat(self):
DATE_LONGDATE = 2
***************
*** 116,120 ****
self.failUnlessEqual(long_name, win32api.GetLongPathNameW(short_name))
long_name = win32api.GetLongPathNameW(short_name)
! ## self.failUnless(type(long_name)==unicode, "GetLongPathNameW returned type '%s'" % (type(long_name),))
self.failUnless(long_name==fname, \
"Expected long name ('%s') to be original name ('%s')" % (long_name, fname))
--- 118,122 ----
self.failUnlessEqual(long_name, win32api.GetLongPathNameW(short_name))
long_name = win32api.GetLongPathNameW(short_name)
! self.failUnless(type(long_name)==str, "GetLongPathNameW returned type '%s'" % (type(long_name),))
self.failUnless(long_name==fname, \
"Expected long name ('%s') to be original name ('%s')" % (long_name, fname))
***************
*** 127,132 ****
fname = os.path.abspath(me)
# passing unicode should cause GetShortPathNameW to be called.
! short_name = win32api.GetShortPathName(fname)
! ## self.failUnless(isinstance(short_name, unicode))
long_name = win32api.GetLongPathName(short_name)
self.failUnless(long_name==fname, \
--- 129,134 ----
fname = os.path.abspath(me)
# passing unicode should cause GetShortPathNameW to be called.
! short_name = win32api.GetShortPathName(str(fname))
! self.failUnless(isinstance(short_name, str))
long_name = win32api.GetLongPathName(short_name)
self.failUnless(long_name==fname, \
***************
*** 134,138 ****
self.failUnlessEqual(long_name, win32api.GetLongPathNameW(short_name))
long_name = win32api.GetLongPathNameW(short_name)
! ## self.failUnless(type(long_name)==unicode, "GetLongPathNameW returned type '%s'" % (type(long_name),))
self.failUnless(long_name==fname, \
"Expected long name ('%s') to be original name ('%s')" % (long_name, fname))
--- 136,140 ----
self.failUnlessEqual(long_name, win32api.GetLongPathNameW(short_name))
long_name = win32api.GetLongPathNameW(short_name)
! self.failUnless(type(long_name)==str, "GetLongPathNameW returned type '%s'" % (type(long_name),))
self.failUnless(long_name==fname, \
"Expected long name ('%s') to be original name ('%s')" % (long_name, fname))
***************
*** 162,166 ****
raise
! attr = win32api.GetFileAttributes(fname)
self.failUnless(attr & win32con.FILE_ATTRIBUTE_DIRECTORY, attr)
--- 164,168 ----
raise
! attr = win32api.GetFileAttributes(str(fname))
self.failUnless(attr & win32con.FILE_ATTRIBUTE_DIRECTORY, attr)
|