[Pymoul-svn] SF.net SVN: pymoul: [100] pymoul/trunk/src/moul/file
Status: Alpha
Brought to you by:
tiran
From: <ti...@us...> - 2007-01-30 01:20:57
|
Revision: 100 http://pymoul.svn.sourceforge.net/pymoul/?rev=100&view=rev Author: tiran Date: 2007-01-29 17:20:56 -0800 (Mon, 29 Jan 2007) Log Message: ----------- Fixed misc bugs in plasmalog and chatlog Added KIImageFixer class with tests Modified Paths: -------------- pymoul/trunk/src/moul/file/chatlog.py pymoul/trunk/src/moul/file/kiimage.py pymoul/trunk/src/moul/file/plasmalog.py pymoul/trunk/src/moul/file/tests/test_kiimage.py pymoul/trunk/src/moul/file/tests/test_plasmalog.py Modified: pymoul/trunk/src/moul/file/chatlog.py =================================================================== --- pymoul/trunk/src/moul/file/chatlog.py 2007-01-29 22:07:42 UTC (rev 99) +++ pymoul/trunk/src/moul/file/chatlog.py 2007-01-30 01:20:56 UTC (rev 100) @@ -22,7 +22,7 @@ (MM/DD hh:mm:ss) Chat.log started... (MM/DD hh:mm:ss) ...Chat.log stopped. -(MM/DD hh:mm:ss) From USER in LOCATION: msg +(MM/DD hh:mm:ss) From USER in LOCATION: MSG (MM/DD hh:mm:ss) Error: ERRORMSG (MM/DD hh:mm:ss) USER: TEXT (note the two spaces!) (MM/DD hh:mm:ss) To USER: TEXT @@ -64,7 +64,6 @@ # User: message ) - LOG = getLogger('moul.chat') def modtime(pathname): Modified: pymoul/trunk/src/moul/file/kiimage.py =================================================================== --- pymoul/trunk/src/moul/file/kiimage.py 2007-01-29 22:07:42 UTC (rev 99) +++ pymoul/trunk/src/moul/file/kiimage.py 2007-01-30 01:20:56 UTC (rev 100) @@ -24,13 +24,38 @@ import os import tempfile import struct +from stat import ST_SIZE +from stat import ST_MTIME +from fnmatch import fnmatch +from moul.log import getLogger + JPEG_HEADER = "\377\330\377" -class KiImageError(ValueError): +LOG = getLogger("moul.kiimage") + +class KIImageError(ValueError): pass -class KiImage(object): +def fixedNewer(ki, fixed): + """Check file size and mod date to see if the fixed image is newer + """ + kistat = os.stat(ki) + fixedstat = os.stat(fixed) + if kistat[ST_SIZE] +4 == fixedstat[ST_SIZE]: + # ok, KI is 4 bytes larger + if fixed[ST_MTIME] > ki[ST_MTIME]: + # ok, fixed is newer + return True + else: + return False + else: + return None + +MOUL_IMAGE = 1 +JPEG_IMAGE = 2 + +class KIImage(object): """Ki image handler MOUL's KI images have four leading bytes of junk that encode the file @@ -44,17 +69,26 @@ else: name = getattr(fd_name, 'name', '<UNKNOWN>') fd = fd_name - + self._filename = name self._fd = fd self._size = None + self._filetype = None def close(self): + """Close file handler + """ if self._fd: self._fd.close() self._fd = None - def getSize(self): + def getFileSize(self): + """Get size of file + + Uses the standard seek(0,2)+tell() syntax to get the size of the file. + The file size is cached in self._size. The function resets the + position of the file pointer at the end of the call. + """ if self._size is not None: return self._size fd = self._fd @@ -69,6 +103,12 @@ return size def moulHeaderToSize(self, header=None): + """Convert a MOUL header to file size in int + + NOTE: A MOUL file is jpeg size + 4 bytes header size + + If header is not given the header of the current file is used. + """ # XXX use struct if header is None: fd = self._fd @@ -80,9 +120,13 @@ return size def sizeToMoulHeader(self, size=None): + """Converts a file size in int to a MOUL header string + + NOTE: A MOUL file is jpeg size + 4 bytes header size + """ # XXX use struct if size is None: - size = self.getSize() + size = self.getFileSize() leading = 4* [None] for i in (3,2,1,0): l = size >> 8*i @@ -92,31 +136,54 @@ def verifyMoulHeader(self, header=None): header_size = self.moulHeaderToSize(header) - file_size = self.getSize() + file_size = self.getFileSize() if (header_size + 4) == file_size: return True return False def isJpeg(self): + """Check if file is a JPEG image w/o MOUL header + + The method only checks the header. It doesn't verify the whole + file. + """ + if self._filetype is not None: + return self._filetype == JPEG_IMAGE fd = self._fd fd.seek(0) data = fd.read(3) if data == JPEG_HEADER and not self.isMoulImage(): + self._filetype = JPEG_IMAGE return True - return False + else: + return False def isMoulImage(self): + """Check if file is a MOUL file (JPEG file with MOUL header) + + The method only checks the header. It doesn't verify the whole + file but it verifies the MOUL header. + """ + if self._filetype is not None: + return self._filetype == MOUL_IMAGE fd = self._fd opos = fd.tell() fd.seek(4) data = fd.read(3) if data == JPEG_HEADER and self.verifyMoulHeader(): + self._filetype = MOUL_IMAGE return True - return False + else: + return False + + def removeMoulHeader(self): + """Remove MOUL header from an image - def removeMoulHeader(self): + The method doesn't chance the current file. It returns a file + descriptor to a temporary file. + """ if not self.isMoulImage(): - raise KiImageError('Image has no MOUL header') + raise KIImageError('Image has no MOUL header') out = tempfile.TemporaryFile() fd = self._fd fd.seek(4) @@ -125,10 +192,15 @@ return out def addMoulHeader(self): + """Add MOUL header to an image + + The method doesn't chance the current file. It returns a file + descriptor to a temporary file. + """ if self.isMoulImage(): - raise KiImageError('Image has already a MOUL header') + raise KIImageError('Image has already a MOUL header') if not self.isJpeg(): - raise KiImageError('File is not a JPEG') + raise KIImageError('File is not a JPEG') out = tempfile.TemporaryFile() header = self.sizeToMoulHeader() fd = self._fd @@ -138,3 +210,62 @@ out.write(fd.read()) out.seek(0) return out + +class KIImageFixer(object): + """Create fixed images in a new directory + """ + _pat = ("*.jpg",) + + def __init__(self, srcdir, destdir): + self._srcdir = srcdir + self._destdir = destdir + self._found = [] # all found files + self._tocheck = [] # found files to check + self._fixed = [] # fixed files + + if not os.path.isdir(srcdir): + LOG.critical("%s is not a directory" % srcdir) + return False + if not os.path.isdir(destdir): + LOG.info("Creating chatlog directory %s" % destdir) + os.mkdir(destdir) + + def findFiles(self): + """Find filess + """ + for root, dirs, files in os.walk(self._srcdir): + for name in files: + matches = [pat for pat in self._pat + if fnmatch(name, pat)] + if matches: + ki = os.path.join(root, name) + fixed = os.path.join(self._destdir, name) + self._found.append(ki) + if os.path.isfile(fixed): + if fixedNewer(ki, fixed): + LOG.debug("File %s exists but was changed." % name) + self._tocheck.append((ki, fixed)) + else: + LOG.debug("File %s exists and is fixed." % name) + else: + self._tocheck.append((ki, fixed)) + def checkAndCopyFiles(self): + """Check files if they are KI images + + If the file is a KI image than copy the file to the new location + """ + for kiname, fixedname in self._tocheck: + ki = KIImage(kiname) + if ki.isMoulImage(): + tmp = ki.removeMoulHeader() + fixed = open(fixedname, 'wb') + while True: + buf = tmp.read(4096) + if not buf: + break + fixed.write(buf) + LOG.info("Created fixed image %s" % fixedname) + self._fixed.append((kiname, fixedname)) + fixed.close() + tmp.close() + ki.close() Modified: pymoul/trunk/src/moul/file/plasmalog.py =================================================================== --- pymoul/trunk/src/moul/file/plasmalog.py 2007-01-29 22:07:42 UTC (rev 99) +++ pymoul/trunk/src/moul/file/plasmalog.py 2007-01-30 01:20:56 UTC (rev 100) @@ -106,20 +106,30 @@ The removeLogs function removes only files considered as safe """ + rmdir = [] for root, dirs, files in os.walk(self._srcdir, topdown=False): for name in files: matches = [pat for pat in self._save_patterns if fnmatch(name, pat)] - fpath = os.path.join(root, name) if matches: - os.remove(fpath) + os.remove(os.path.join(root, name)) else: - self._not_removed.append(fpath) + self._not_removed.append((root, name)) LOG.warning("Won't remove %s from %s" % (name, root)) for name in dirs: - os.rmdir(os.path.join(root, name)) + rmdir.append(os.path.join(root, name)) - os.rmdir(self._srcdir) + rmdir.append(self._srcdir) + for d in rmdir: + try: + os.rmdir(d) + except OSError, err: + LOG.exception("Could not delete directory") + self._not_removed.append(d) + if self._not_removed: + return False + else: + return True def __call__(self): self.zipLogDir() Modified: pymoul/trunk/src/moul/file/tests/test_kiimage.py =================================================================== --- pymoul/trunk/src/moul/file/tests/test_kiimage.py 2007-01-29 22:07:42 UTC (rev 99) +++ pymoul/trunk/src/moul/file/tests/test_kiimage.py 2007-01-30 01:20:56 UTC (rev 100) @@ -26,15 +26,15 @@ import unittest from doctest import DocTestSuite -from moul.file.kiimage import KiImage -from moul.file.kiimage import KiImageError +from moul.file.kiimage import KIImage +from moul.file.kiimage import KIImageError from moul.file.kiimage import JPEG_HEADER base = os.path.dirname(__file__) kiimg = os.path.join(base, 'avatar.jpg') kiclean = os.path.join(base, 'avatar_clean.jpg') -class KiImageTest(unittest.TestCase): +class KIImageTest(unittest.TestCase): def setUp(self): self._ki = open(kiimg, 'rb') @@ -45,29 +45,29 @@ self._clean.close() def test_openname(self): - k = KiImage(kiimg) + k = KIImage(kiimg) self.failUnless(k.verifyMoulHeader()) self.failUnless(k.isMoulImage()) self.failIf(k.isJpeg()) - k = KiImage(kiclean) + k = KIImage(kiclean) self.failIf(k.verifyMoulHeader()) self.failIf(k.isMoulImage()) self.failUnless(k.isJpeg()) def test_openfd(self): - k = KiImage(self._ki) + k = KIImage(self._ki) self.failUnless(k.verifyMoulHeader()) self.failUnless(k.isMoulImage()) self.failIf(k.isJpeg()) - k = KiImage(self._clean) + k = KIImage(self._clean) self.failIf(k.verifyMoulHeader()) self.failIf(k.isMoulImage()) self.failUnless(k.isJpeg()) def test_removeheader(self): - k = KiImage(self._ki) + k = KIImage(self._ki) fd = k.removeMoulHeader() data = fd.read() cleandata = self._clean.read() @@ -76,7 +76,7 @@ "file mismatch %r:%r" % (data[:8], cleandata[:8])) def test_addheader(self): - k = KiImage(self._clean) + k = KIImage(self._clean) fd = k.addMoulHeader() data = fd.read() kidata = self._ki.read() @@ -86,7 +86,7 @@ def test_suite(): return unittest.TestSuite(( - unittest.makeSuite(KiImageTest), + unittest.makeSuite(KIImageTest), DocTestSuite('moul.file.kiimage') )) Modified: pymoul/trunk/src/moul/file/tests/test_plasmalog.py =================================================================== --- pymoul/trunk/src/moul/file/tests/test_plasmalog.py 2007-01-29 22:07:42 UTC (rev 99) +++ pymoul/trunk/src/moul/file/tests/test_plasmalog.py 2007-01-30 01:20:56 UTC (rev 100) @@ -30,6 +30,8 @@ from moul.file.plasmalog import PlasmalogZipper from moul.file.chatlog import ChatlogMover +from moul.file.kiimage import KIImageFixer +from moul.file.kiimage import KIImage FILE_LIST = ['audio.0.elf', 'audiocaps.0.elf', 'audioTimes.0.elf', 'Avatar.0.elf', 'impacts.0.elf', 'LocalizationDataMgr.0.elf', @@ -70,6 +72,8 @@ _fakeLogdir(os.path.join(path, 'Logs', 'faketest')) shutil.copyfile(os.path.join(base, 'avatar.jpg'), os.path.join(path, 'Avatars', '001.jpg')) + shutil.copyfile(os.path.join(base, 'avatar.jpg'), + os.path.join(path, 'KIimages', 'KIimage001.jpg')) shutil.copyfile(os.path.join(base, 'graphics.ini'), os.path.join(path, 'init', 'graphics.ini')) shutil.copyfile(os.path.join(base, 'audio.ini'), @@ -78,19 +82,24 @@ class PlasmaChatTest(unittest.TestCase): def setUp(self): self._tmpdir = mkdtemp() + self._logdir = os.path.join(self._tmpdir, 'Logs') + self._kidir = os.path.join(self._tmpdir, 'KIimages') + self._avatardir = os.path.join(self._tmpdir, 'Avatars') + self._chatdest = os.path.join(self._tmpdir, 'Chatlogs') self._logdest = os.path.join(self._tmpdir, 'ZippedLogs') - self._logdir = os.path.join(self._tmpdir, 'Logs') + + self._kidest = os.path.join(self._tmpdir, 'FixedImages') _fakeUruLiveDir(self._tmpdir) - self._clm = ChatlogMover(self._logdir, self._chatdest) - self._plz = PlasmalogZipper(self._logdir, self._logdest) - def test_createDir(self): + def test_000dirs(self): + for d in (self._tmpdir, self._logdir, self._kidir, self._avatardir): + self.failUnless(os.path.isdir(d), d) + + def test_plasmaLogZipper(self): + plz = PlasmalogZipper(self._logdir, self._logdest) self.failUnless(os.path.isdir(self._logdest), self._logdest) - self.failUnless(os.path.isdir(self._chatdest), self._chatdest) - def test_plasmaLogZipper(self): - plz = self._plz self.failUnless(plz.isPlasmaLogDir()) plz.zipLogDir() content = os.listdir(self._logdest) @@ -103,7 +112,9 @@ self.failIf(os.path.isdir(self._logdir)) def test_chatLogMover(self): - clm = self._clm + clm = ChatlogMover(self._logdir, self._chatdest) + self.failUnless(os.path.isdir(self._chatdest), self._chatdest) + clm.findLogs() clm.moveChatlogs() content = os.listdir(self._chatdest) @@ -114,6 +125,22 @@ self.failUnless(c.endswith(".txt")) self.failUnlessEqual(len(c), length, c) + def test_kiImageMover(self): + kif = KIImageFixer(self._kidir, self._kidest) + self.failUnless(os.path.isdir(self._kidest), self._kidest) + + kif.findFiles() + kif.checkAndCopyFiles() + content = os.listdir(self._kidest) + self.failUnlessEqual(len(content), 1, content) + self.failUnlessEqual(content, ["KIimage001.jpg"]) + + cleandata = open(os.path.join(base, 'avatar_clean.jpg'), 'rb').read() + newdata = open(os.path.join(self._kidest, "KIimage001.jpg"), 'rb').read() + + self.failUnlessEqual(len(cleandata), len(newdata)) + self.failUnlessEqual(cleandata, newdata) + def tearDown(self): shutil.rmtree(self._tmpdir) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |