[Pymoul-svn] SF.net SVN: pymoul: [208] binaryfile/trunk
Status: Alpha
Brought to you by:
tiran
|
From: <ti...@us...> - 2007-02-27 15:49:16
|
Revision: 208
http://pymoul.svn.sourceforge.net/pymoul/?rev=208&view=rev
Author: tiran
Date: 2007-02-27 07:49:15 -0800 (Tue, 27 Feb 2007)
Log Message:
-----------
Copied files
Modified Paths:
--------------
binaryfile/trunk/Makefile
Added Paths:
-----------
binaryfile/trunk/src/binaryfile/__init__.py
binaryfile/trunk/src/binaryfile/binary.py
binaryfile/trunk/src/binaryfile/binaryrecord.py
binaryfile/trunk/src/binaryfile/tests.py
Modified: binaryfile/trunk/Makefile
===================================================================
--- binaryfile/trunk/Makefile 2007-02-27 15:38:21 UTC (rev 207)
+++ binaryfile/trunk/Makefile 2007-02-27 15:49:15 UTC (rev 208)
@@ -32,7 +32,7 @@
# What should the default be?
test:
- $(PYTHON) src/enumprocess/processinfo.py
+ $(PYTHON) src/binaryfile/tests.py
egg: egg24 egg25
Added: binaryfile/trunk/src/binaryfile/__init__.py
===================================================================
Property changes on: binaryfile/trunk/src/binaryfile/__init__.py
___________________________________________________________________
Name: svn:keywords
+ 'Id Revision'
Name: svn:eol-style
+ native
Copied: binaryfile/trunk/src/binaryfile/binary.py (from rev 204, pymoul/trunk/src/moul/crypt/binary.py)
===================================================================
--- binaryfile/trunk/src/binaryfile/binary.py (rev 0)
+++ binaryfile/trunk/src/binaryfile/binary.py 2007-02-27 15:49:15 UTC (rev 208)
@@ -0,0 +1,283 @@
+# Copyright (C) 2007 Christian Heimes <christian (at) cheimes (dot) de>
+"""Binary file helper
+"""
+__author__ = "Christian Heimes"
+__version__ = "$Id"
+__revision__ = "$Revision"
+
+from struct import calcsize
+from struct import pack
+from struct import unpack
+
+from binaryrecord import parseRecord
+from binaryrecord import registerRecord
+
+class BinaryFile(file):
+ """Binary file
+
+ A file based class with additional methods to read and write binary data.
+
+ The class supports reading and writing:
+ - char: readChar / writeChar
+ - byte: readByte / writeByte
+ - bool: readBool / writeBool
+ - signed int8: read8 / write8
+ - unsigned int8: read8s / write8s
+ - unsigned int 16: read16 / write16
+ - signed int 16: read16s / write16s
+ - unsigned int 32: read32 / write32
+ - signed int 32: read32s / write32s
+ - unsigned int 64: read64 / write64
+ - signed int 64: read64s / write64s
+ - float: readFloat / writeFloat
+ - double: readDouble / writeDouble
+ - packed data: readPacked(fmt) / writePacked(fmt, data)
+ - quad (two int16): readQuad / writeQuad)
+ - NULL: read0 / write0
+ - null string: readString0 / writeString0 (size is string + NULL)
+ - string w/ 16bit size header: readString16(null terminated) / writeString16
+ - string w/ 32bit size header: readString32(null terminated) / writeString32
+
+ For conveniance the class has a size() method
+
+ The class is using some optimization tricks like binding functions to the
+ local namespace of a method.
+ """
+ def __new__(cls, fname, mode='rb'):
+ assert 'b' in mode
+ self = file.__new__(cls, fname, mode)
+ self.NULL = '\x00'
+ return self
+
+ def size(self):
+ pos = self.tell()
+ try:
+ self.seek(0, 2)
+ return self.tell()
+ finally:
+ self.seek(pos)
+
+ def readChar(self, _unpack=unpack):
+ return _unpack('<c', self.read(1))[0]
+
+ def readByte(self, _unpack=unpack):
+ return _unpack('<B',self.read(1))[0]
+
+ def readBool(self, _unpack=unpack):
+ return bool(_unpack('<B', self.read(1))[0])
+
+ def read8(self, _unpack=unpack):
+ return _unpack('<B', self.read(1))[0]
+
+ def read8s(self, _unpack=unpack):
+ return _unpack('<b', self.read(1))[0]
+
+ def read16(self, _unpack=unpack):
+ return _unpack('<H', self.read(2))[0]
+
+ def read16s(self, _unpack=unpack):
+ return _unpack('<h', self.read(2))[0]
+
+ def read32(self, _unpack=unpack):
+ return _unpack('<I', self.read(4))[0]
+
+ def read32s(self, _unpack=unpack):
+ return _unpack('<i', self.read(4))[0]
+
+ def read64(self, _unpack=unpack):
+ return _unpack('<Q', self.read(8))[0]
+
+ def read64s(self, _unpack=unpack):
+ return _unpack('<q',self.read(8))[0]
+
+ def readQuad(self, _unpack=unpack):
+ return _unpack('<2I', self.read(8))
+
+ def readFloat(self, _unpack=unpack):
+ return _unpack('<f', self.read(4))[0]
+
+ def readDouble(self, _unpack=unpack):
+ return _unpack('<d', self.read(8))[0]
+
+ def readPacked(self, fmt, _unpack=unpack):
+ return unpack(fmt, self.read(calcsize(fmt)))
+
+ def read0(self):
+ null = self.read(1)
+ if null != self.NULL:
+ raise ValueError("%s != NULL at %i" % (null, self.tell()-1))
+ return null
+
+ def readUruString(self, version=5):
+ return UruString('', version=version).readfd(self)
+
+ def readString0(self, size):
+ s = self.read(size-1)
+ self.read0()
+ return s
+
+ def readString16(self, terminate=False):
+ return String16('', terminate=terminate).readfd(self)
+
+ def readString32(self, terminate=False):
+ return String32('', terminate=terminate).readfd(self)
+
+ def readRecord(self, name):
+ return parseRecord(name, self)
+
+ @staticmethod
+ def registerRecord(name, fmt):
+ return registerRecord(name, fmt)
+
+ #write
+ def writeChar(self, data, _pack=pack):
+ self.write(_pack('<c', data))
+
+ def writeByte(self, data, _pack=pack):
+ self.write(_pack('<B', data))
+
+ def writeBool(self, data, _pack=pack):
+ self.write(_pack('<B', bool(data)))
+
+ def write8(self, data, _pack=pack):
+ self.write(_pack('<B', data))
+
+ def write8s(self, data, _pack=pack):
+ self.write(_pack('<b', data))
+
+ def write16(self, data, _pack=pack):
+ self.write(_pack('<H', data))
+
+ def write16s(self, data, _pack=pack):
+ self.write(_pack('<h', data))
+
+ def write32(self, data, _pack=pack):
+ self.write(_pack('<I', data))
+
+ def write32s(self, data, _pack=pack):
+ self.write(_pack('<i', data))
+
+ def write64(self, data, _pack=pack):
+ self.write(_pack('<Q', data))
+
+ def write64s(self, data, _pack=pack):
+ self.write(_pack('<q', data))
+
+ def writeQuad(self, tupl, _pack=pack):
+ self.write(_pack('<2I', *tupl))
+
+ def writeFloat(self, data, _pack=pack):
+ self.write(_pack('<f', data))
+
+ def writeDouble(self, data, _pack=pack):
+ self.write(_pack('<d', data))
+
+ def write0(self):
+ self.write(self.NULL)
+
+ def writeString0(self, s):
+ self.write(s)
+ self.write0()
+
+ def writePacked(self, data, fmt):
+ self.write(pack(fmt, data))
+
+ def writeUruString(self, data, version=5):
+ UruString(data, version=version).writefd(self)
+
+ def writeString16(self, data, terminate=False):
+ String16(data, terminate=terminate).writefd(self)
+
+ def writeString32(self, data, terminate=False):
+ String32(data, terminate=terminate).writefd(self)
+
+ def writeRecord(self, rec):
+ self.write(rec.read())
+
+class AbstractString(object):
+ """Abstract string class
+ """
+ def __init__(self, s=''):
+ self._data = s
+
+ def readfd(self, fd):
+ raise NotImplementedError
+
+ def writefd(self, fd):
+ raise NotImplementedError
+
+ def clear(self):
+ """Clear data
+ """
+ self._data = ''
+
+ def set(self, s):
+ """Replace current data with s
+ """
+ self._data = s
+
+ def __repr__(self):
+ """repr(self)
+ """
+ return ("<%s at %x (%i)" % (self.__class__.__name__, id(self),
+ len(self)))
+
+ def __len__(self):
+ """len(self)
+ """
+ return len(self._data)
+
+ def __cmp__(self, other):
+ if isinstance(other, AbstractString):
+ return cmp(self._data, other._data)
+ else:
+ return cmp(self._data, other)
+
+class String32(AbstractString):
+ """String with 32 bit size header
+ """
+ def __init__(self, s='', terminate=False):
+ AbstractString.__init__(self, s)
+ self._terminate = bool(terminate)
+
+ def readfd(self, fd):
+ size = fd.read32()
+ if self._terminate:
+ self._data = fd.readString0(size)
+ else:
+ self._data = fd.read(size)
+ return self._data
+
+ def writefd(self, fd):
+ size = len(self)
+ if self._terminate:
+ fd.write32(size+1)
+ fd.writeString0(self._data)
+ else:
+ fd.write32(size)
+ fd.write(self._data)
+
+class String16(AbstractString):
+ """String with 16 bit size header
+ """
+ def __init__(self, s='', terminate=False):
+ AbstractString.__init__(self, s)
+ self._terminate = bool(terminate)
+
+ def readfd(self, fd):
+ size = fd.read16()
+ if self._terminate:
+ self._data = fd.readString0(size)
+ else:
+ self._data = fd.read(size)
+ return self._data
+
+ def writefd(self, fd):
+ size = len(self)
+ if self._terminate:
+ fd.write16(size+1)
+ fd.writeString0(self._data)
+ else:
+ fd.write16(size)
+ fd.write(self._data)
+
Copied: binaryfile/trunk/src/binaryfile/binaryrecord.py (from rev 204, pymoul/trunk/src/moul/crypt/binaryrecord.py)
===================================================================
--- binaryfile/trunk/src/binaryfile/binaryrecord.py (rev 0)
+++ binaryfile/trunk/src/binaryfile/binaryrecord.py 2007-02-27 15:49:15 UTC (rev 208)
@@ -0,0 +1,154 @@
+# Copyright (C) 2007 Christian Heimes <christian (at) cheimes (dot) de>
+"""Binary file helper: Records
+
+This module is roughly based on Maciej Obarski's recipe "parse and create
+fixed size binary data" from
+http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/465219
+"""
+__author__ = "Christian Heimes"
+__version__ = "$Id"
+__revision__ = "$Revision"
+
+from struct import calcsize
+from struct import pack
+from struct import unpack
+
+_marker = object()
+
+class RecordRegistry(dict):
+ """Registry for record definitions
+ """
+ __slots__ = ()
+ def register(self, name, fmt):
+ """Register a format by name
+
+ @param name: name of the format
+ @type name: str
+ @param fmt: a record format
+ @type fmt: str
+
+ Example:
+ >>> reg = RecordRegistry()
+ >>> registerRecord = reg.register
+ >>> parseRecord = reg.parse
+ >>> obj = registerRecord("connection", "4B.ip >H.port >I.session_id")
+ >>> isinstance(obj, RecordDefinition)
+ True
+ >>> data = "\\xc0\\xa8\\x00\\x01" + "\\x00P" + "\\xFE\\xDC\\xBA\\x98"
+
+ >>> rec = parseRecord("connection", data)
+ >>> rec.ip
+ (192, 168, 0, 1)
+ >>> rec.port
+ 80
+ >>> rec.session_id
+ 4275878552L
+ >>> rec.read() == data or rec.read()
+ True
+ """
+ if name in self:
+ raise NameError("%s already registered!" % name)
+ self[name] = RecordDefinition(name, fmt)
+ return self[name]
+
+ def parse(self, name, fd_data):
+ """Parse data using the RecordDefinition 'name'
+
+ @param name: name of the format
+ @type name: str
+ @param fd_data: data to parse: either a string or an open file
+ @type fd_data: str or file
+ """
+ return self[name](fd_data)
+
+class RecordDefinition(object):
+ """A record definition
+ """
+ __slots__ = ('_fields', '_recordsize', '_name')
+
+ def __init__(self, name, recordfmt):
+ self._name = name
+ self._fields = []
+ pos = 0
+ for field in recordfmt.split():
+ if field.startswith('#'):
+ continue
+ fmt, name = field.split('.')
+ if '#' in name:
+ name = name.split('#')[0]
+ name = name.strip()
+ size = calcsize(fmt)
+ self._fields.append((name, fmt, pos, pos+size))
+ pos += size
+
+ self._recordsize = pos
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def size(self):
+ return self._recordsize
+
+ def __call__(self, fd_data):
+ """Parse data using the format string
+
+ @param fd_data: data to parse: either a string or an open file
+ @type fd_data: str or file
+ """
+ if isinstance(fd_data, basestring):
+ # handle string
+ data = fd_data
+ elif hasattr(fd_data, 'read'):
+ data = fd_data.read(self._recordsize)
+ else:
+ raise TypeError(type(fd_data))
+ if len(data) != self._recordsize:
+ raise ValueError("Data has wrong size: %i, required: %i" %
+ (len(data), self._recordsize))
+ return Record(self._fields, data)
+
+class Record(object):
+ __slots__ = ('_fields', '_data')
+
+ def __init__(self, fields, data=None):
+ self._fields = fields
+ self._data = {}
+ if data is not None:
+ self.write(data)
+
+ def write(self, data):
+ """Write data
+
+ Creates the instance attributes defined in fmt
+ """
+ for name, fmt, start, stop in self._fields:
+ value = unpack(fmt, data[start:stop])
+ if len(value) == 1:
+ value = value[0]
+ self._data[name] = value
+
+ def read(self):
+ """Convert data to binary string
+ """
+ result = []
+ for name, fmt, start, stop in self._fields:
+ value = self._data[name]
+ if not isinstance(value, (tuple, list)):
+ value = (value,)
+ result.append(pack(fmt, *value))
+ return ''.join(result)
+
+ def __getattr__(self, name, default=_marker):
+ value = self._data.get(name, default)
+ if value is _marker:
+ raise AttributeError(name)
+ return value
+
+ def __str__(self):
+ return self.read()
+
+_recordRegistry = RecordRegistry()
+registerRecord = _recordRegistry.register
+parseRecord = _recordRegistry.parse
Copied: binaryfile/trunk/src/binaryfile/tests.py (from rev 204, pymoul/trunk/src/moul/crypt/tests/test_binary.py)
===================================================================
--- binaryfile/trunk/src/binaryfile/tests.py (rev 0)
+++ binaryfile/trunk/src/binaryfile/tests.py 2007-02-27 15:49:15 UTC (rev 208)
@@ -0,0 +1,135 @@
+# Copyright (C) 2007 Christian Heimes <christian (at) cheimes (dot) de>
+"""binaryfile unit tests
+"""
+__author__ = "Christian Heimes"
+__version__ = "$Id$"
+__revision__ = "$Revision$"
+
+import os
+import unittest
+from doctest import DocTestSuite
+from tempfile import mkstemp
+
+from binary import BinaryFile
+
+class BinaryFileTest(unittest.TestCase):
+ def setUp(self):
+ self.tmpname = mkstemp()[1]
+ self.b = BinaryFile(self.tmpname, 'wb+')
+
+ def tearDown(self):
+ self.b.close()
+ os.unlink(self.tmpname)
+
+ def _testrw(self, name, data):
+ #import pdb; pdb.set_trace()
+ read = getattr(self.b, 'read%s' % name)
+ write = getattr(self.b, 'write%s' % name)
+ write(data)
+ self.b.seek(0)
+ fdata = read()
+ self.failUnlessEqual(data, fdata)
+
+ def test_char(self):
+ self._testrw('Char', 'a')
+
+ def test_byte(self):
+ self._testrw('Byte', 127)
+
+ def test_bool(self):
+ self._testrw('Bool', True)
+
+ def test_8(self):
+ self._testrw('8', 42)
+
+ def test_8s(self):
+ self._testrw('8s', -42)
+
+ def test_16(self):
+ self._testrw('16', 2**15)
+
+ def test_16s(self):
+ self._testrw('16s', -2**15)
+
+ def test_32(self):
+ self._testrw('32', 2*31)
+
+ def test_32s(self):
+ self._testrw('32s', -2*31)
+
+ def test_64(self):
+ self._testrw('64', 2*63)
+
+ def test_64s(self):
+ self._testrw('64s', -2*63)
+
+ def test_float(self):
+ data = -0.07
+ self.b.writeFloat(data)
+ self.b.seek(0)
+ self.failUnlessAlmostEqual(data, self.b.readFloat())
+
+ def test_double(self):
+ self._testrw('Double', -23*10e200)
+
+ def test_quad(self):
+ data = (23, 42)
+ self.b.writeQuad(data)
+ self.b.seek(0)
+ self.failUnlessEqual(data, self.b.readQuad())
+
+ def test_urustring(self):
+ # XXX: no test data
+ pass
+
+ def test_string0(self):
+ s = "a test string"
+ l = len(s)
+ self.b.writeString0(s)
+ self.b.seek(0)
+ self.failUnlessEqual(self.b.size(), l+1)
+ self.failUnlessEqual(self.b.readString0(l+1), s)
+ self.b.seek(0)
+ self.failUnlessEqual(self.b.read(), s+'\x00')
+
+ def test_string16(self):
+ s = "a test string"
+ l = len(s)
+ self.b.writeString16(s, terminate=False)
+ self.b.seek(0)
+ self.failUnlessEqual(self.b.size(), l+2)
+ self.failUnlessEqual(self.b.readString16(terminate=False), s)
+
+ self.b.seek(0)
+ self.b.truncate(0)
+ self.b.writeString16(s, terminate=True)
+ self.b.seek(0)
+ self.failUnlessEqual(self.b.size(), l+3)
+ self.failUnlessEqual(self.b.readString16(terminate=True), s)
+
+ def test_string32(self):
+ s = "a test string"
+ l = len(s)
+ self.b.writeString32(s, terminate=False)
+ self.b.seek(0)
+ self.failUnlessEqual(self.b.size(), l+4)
+ self.failUnlessEqual(self.b.readString32(terminate=False), s)
+
+ self.b.seek(0)
+ self.b.truncate(0)
+ self.b.writeString32(s, terminate=True)
+ self.b.seek(0)
+ self.failUnlessEqual(self.b.size(), l+5)
+ self.failUnlessEqual(self.b.readString32(terminate=True), s)
+
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite(BinaryFileTest),
+ DocTestSuite('binary'),
+ DocTestSuite('binaryrecord'),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest="test_suite")
+
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|