[Pymoul-svn] SF.net SVN: pymoul: [208] binaryfile/trunk
Status: Alpha
Brought to you by:
tiran
From: <ti...@us...> - 2007-02-27 15:49:16
|
Revision: 208 http://pymoul.svn.sourceforge.net/pymoul/?rev=208&view=rev Author: tiran Date: 2007-02-27 07:49:15 -0800 (Tue, 27 Feb 2007) Log Message: ----------- Copied files Modified Paths: -------------- binaryfile/trunk/Makefile Added Paths: ----------- binaryfile/trunk/src/binaryfile/__init__.py binaryfile/trunk/src/binaryfile/binary.py binaryfile/trunk/src/binaryfile/binaryrecord.py binaryfile/trunk/src/binaryfile/tests.py Modified: binaryfile/trunk/Makefile =================================================================== --- binaryfile/trunk/Makefile 2007-02-27 15:38:21 UTC (rev 207) +++ binaryfile/trunk/Makefile 2007-02-27 15:49:15 UTC (rev 208) @@ -32,7 +32,7 @@ # What should the default be? test: - $(PYTHON) src/enumprocess/processinfo.py + $(PYTHON) src/binaryfile/tests.py egg: egg24 egg25 Added: binaryfile/trunk/src/binaryfile/__init__.py =================================================================== Property changes on: binaryfile/trunk/src/binaryfile/__init__.py ___________________________________________________________________ Name: svn:keywords + 'Id Revision' Name: svn:eol-style + native Copied: binaryfile/trunk/src/binaryfile/binary.py (from rev 204, pymoul/trunk/src/moul/crypt/binary.py) =================================================================== --- binaryfile/trunk/src/binaryfile/binary.py (rev 0) +++ binaryfile/trunk/src/binaryfile/binary.py 2007-02-27 15:49:15 UTC (rev 208) @@ -0,0 +1,283 @@ +# Copyright (C) 2007 Christian Heimes <christian (at) cheimes (dot) de> +"""Binary file helper +""" +__author__ = "Christian Heimes" +__version__ = "$Id" +__revision__ = "$Revision" + +from struct import calcsize +from struct import pack +from struct import unpack + +from binaryrecord import parseRecord +from binaryrecord import registerRecord + +class BinaryFile(file): + """Binary file + + A file based class with additional methods to read and write binary data. + + The class supports reading and writing: + - char: readChar / writeChar + - byte: readByte / writeByte + - bool: readBool / writeBool + - signed int8: read8 / write8 + - unsigned int8: read8s / write8s + - unsigned int 16: read16 / write16 + - signed int 16: read16s / write16s + - unsigned int 32: read32 / write32 + - signed int 32: read32s / write32s + - unsigned int 64: read64 / write64 + - signed int 64: read64s / write64s + - float: readFloat / writeFloat + - double: readDouble / writeDouble + - packed data: readPacked(fmt) / writePacked(fmt, data) + - quad (two int16): readQuad / writeQuad) + - NULL: read0 / write0 + - null string: readString0 / writeString0 (size is string + NULL) + - string w/ 16bit size header: readString16(null terminated) / writeString16 + - string w/ 32bit size header: readString32(null terminated) / writeString32 + + For conveniance the class has a size() method + + The class is using some optimization tricks like binding functions to the + local namespace of a method. + """ + def __new__(cls, fname, mode='rb'): + assert 'b' in mode + self = file.__new__(cls, fname, mode) + self.NULL = '\x00' + return self + + def size(self): + pos = self.tell() + try: + self.seek(0, 2) + return self.tell() + finally: + self.seek(pos) + + def readChar(self, _unpack=unpack): + return _unpack('<c', self.read(1))[0] + + def readByte(self, _unpack=unpack): + return _unpack('<B',self.read(1))[0] + + def readBool(self, _unpack=unpack): + return bool(_unpack('<B', self.read(1))[0]) + + def read8(self, _unpack=unpack): + return _unpack('<B', self.read(1))[0] + + def read8s(self, _unpack=unpack): + return _unpack('<b', self.read(1))[0] + + def read16(self, _unpack=unpack): + return _unpack('<H', self.read(2))[0] + + def read16s(self, _unpack=unpack): + return _unpack('<h', self.read(2))[0] + + def read32(self, _unpack=unpack): + return _unpack('<I', self.read(4))[0] + + def read32s(self, _unpack=unpack): + return _unpack('<i', self.read(4))[0] + + def read64(self, _unpack=unpack): + return _unpack('<Q', self.read(8))[0] + + def read64s(self, _unpack=unpack): + return _unpack('<q',self.read(8))[0] + + def readQuad(self, _unpack=unpack): + return _unpack('<2I', self.read(8)) + + def readFloat(self, _unpack=unpack): + return _unpack('<f', self.read(4))[0] + + def readDouble(self, _unpack=unpack): + return _unpack('<d', self.read(8))[0] + + def readPacked(self, fmt, _unpack=unpack): + return unpack(fmt, self.read(calcsize(fmt))) + + def read0(self): + null = self.read(1) + if null != self.NULL: + raise ValueError("%s != NULL at %i" % (null, self.tell()-1)) + return null + + def readUruString(self, version=5): + return UruString('', version=version).readfd(self) + + def readString0(self, size): + s = self.read(size-1) + self.read0() + return s + + def readString16(self, terminate=False): + return String16('', terminate=terminate).readfd(self) + + def readString32(self, terminate=False): + return String32('', terminate=terminate).readfd(self) + + def readRecord(self, name): + return parseRecord(name, self) + + @staticmethod + def registerRecord(name, fmt): + return registerRecord(name, fmt) + + #write + def writeChar(self, data, _pack=pack): + self.write(_pack('<c', data)) + + def writeByte(self, data, _pack=pack): + self.write(_pack('<B', data)) + + def writeBool(self, data, _pack=pack): + self.write(_pack('<B', bool(data))) + + def write8(self, data, _pack=pack): + self.write(_pack('<B', data)) + + def write8s(self, data, _pack=pack): + self.write(_pack('<b', data)) + + def write16(self, data, _pack=pack): + self.write(_pack('<H', data)) + + def write16s(self, data, _pack=pack): + self.write(_pack('<h', data)) + + def write32(self, data, _pack=pack): + self.write(_pack('<I', data)) + + def write32s(self, data, _pack=pack): + self.write(_pack('<i', data)) + + def write64(self, data, _pack=pack): + self.write(_pack('<Q', data)) + + def write64s(self, data, _pack=pack): + self.write(_pack('<q', data)) + + def writeQuad(self, tupl, _pack=pack): + self.write(_pack('<2I', *tupl)) + + def writeFloat(self, data, _pack=pack): + self.write(_pack('<f', data)) + + def writeDouble(self, data, _pack=pack): + self.write(_pack('<d', data)) + + def write0(self): + self.write(self.NULL) + + def writeString0(self, s): + self.write(s) + self.write0() + + def writePacked(self, data, fmt): + self.write(pack(fmt, data)) + + def writeUruString(self, data, version=5): + UruString(data, version=version).writefd(self) + + def writeString16(self, data, terminate=False): + String16(data, terminate=terminate).writefd(self) + + def writeString32(self, data, terminate=False): + String32(data, terminate=terminate).writefd(self) + + def writeRecord(self, rec): + self.write(rec.read()) + +class AbstractString(object): + """Abstract string class + """ + def __init__(self, s=''): + self._data = s + + def readfd(self, fd): + raise NotImplementedError + + def writefd(self, fd): + raise NotImplementedError + + def clear(self): + """Clear data + """ + self._data = '' + + def set(self, s): + """Replace current data with s + """ + self._data = s + + def __repr__(self): + """repr(self) + """ + return ("<%s at %x (%i)" % (self.__class__.__name__, id(self), + len(self))) + + def __len__(self): + """len(self) + """ + return len(self._data) + + def __cmp__(self, other): + if isinstance(other, AbstractString): + return cmp(self._data, other._data) + else: + return cmp(self._data, other) + +class String32(AbstractString): + """String with 32 bit size header + """ + def __init__(self, s='', terminate=False): + AbstractString.__init__(self, s) + self._terminate = bool(terminate) + + def readfd(self, fd): + size = fd.read32() + if self._terminate: + self._data = fd.readString0(size) + else: + self._data = fd.read(size) + return self._data + + def writefd(self, fd): + size = len(self) + if self._terminate: + fd.write32(size+1) + fd.writeString0(self._data) + else: + fd.write32(size) + fd.write(self._data) + +class String16(AbstractString): + """String with 16 bit size header + """ + def __init__(self, s='', terminate=False): + AbstractString.__init__(self, s) + self._terminate = bool(terminate) + + def readfd(self, fd): + size = fd.read16() + if self._terminate: + self._data = fd.readString0(size) + else: + self._data = fd.read(size) + return self._data + + def writefd(self, fd): + size = len(self) + if self._terminate: + fd.write16(size+1) + fd.writeString0(self._data) + else: + fd.write16(size) + fd.write(self._data) + Copied: binaryfile/trunk/src/binaryfile/binaryrecord.py (from rev 204, pymoul/trunk/src/moul/crypt/binaryrecord.py) =================================================================== --- binaryfile/trunk/src/binaryfile/binaryrecord.py (rev 0) +++ binaryfile/trunk/src/binaryfile/binaryrecord.py 2007-02-27 15:49:15 UTC (rev 208) @@ -0,0 +1,154 @@ +# Copyright (C) 2007 Christian Heimes <christian (at) cheimes (dot) de> +"""Binary file helper: Records + +This module is roughly based on Maciej Obarski's recipe "parse and create +fixed size binary data" from +http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/465219 +""" +__author__ = "Christian Heimes" +__version__ = "$Id" +__revision__ = "$Revision" + +from struct import calcsize +from struct import pack +from struct import unpack + +_marker = object() + +class RecordRegistry(dict): + """Registry for record definitions + """ + __slots__ = () + def register(self, name, fmt): + """Register a format by name + + @param name: name of the format + @type name: str + @param fmt: a record format + @type fmt: str + + Example: + >>> reg = RecordRegistry() + >>> registerRecord = reg.register + >>> parseRecord = reg.parse + >>> obj = registerRecord("connection", "4B.ip >H.port >I.session_id") + >>> isinstance(obj, RecordDefinition) + True + >>> data = "\\xc0\\xa8\\x00\\x01" + "\\x00P" + "\\xFE\\xDC\\xBA\\x98" + + >>> rec = parseRecord("connection", data) + >>> rec.ip + (192, 168, 0, 1) + >>> rec.port + 80 + >>> rec.session_id + 4275878552L + >>> rec.read() == data or rec.read() + True + """ + if name in self: + raise NameError("%s already registered!" % name) + self[name] = RecordDefinition(name, fmt) + return self[name] + + def parse(self, name, fd_data): + """Parse data using the RecordDefinition 'name' + + @param name: name of the format + @type name: str + @param fd_data: data to parse: either a string or an open file + @type fd_data: str or file + """ + return self[name](fd_data) + +class RecordDefinition(object): + """A record definition + """ + __slots__ = ('_fields', '_recordsize', '_name') + + def __init__(self, name, recordfmt): + self._name = name + self._fields = [] + pos = 0 + for field in recordfmt.split(): + if field.startswith('#'): + continue + fmt, name = field.split('.') + if '#' in name: + name = name.split('#')[0] + name = name.strip() + size = calcsize(fmt) + self._fields.append((name, fmt, pos, pos+size)) + pos += size + + self._recordsize = pos + + @property + def name(self): + return self._name + + @property + def size(self): + return self._recordsize + + def __call__(self, fd_data): + """Parse data using the format string + + @param fd_data: data to parse: either a string or an open file + @type fd_data: str or file + """ + if isinstance(fd_data, basestring): + # handle string + data = fd_data + elif hasattr(fd_data, 'read'): + data = fd_data.read(self._recordsize) + else: + raise TypeError(type(fd_data)) + if len(data) != self._recordsize: + raise ValueError("Data has wrong size: %i, required: %i" % + (len(data), self._recordsize)) + return Record(self._fields, data) + +class Record(object): + __slots__ = ('_fields', '_data') + + def __init__(self, fields, data=None): + self._fields = fields + self._data = {} + if data is not None: + self.write(data) + + def write(self, data): + """Write data + + Creates the instance attributes defined in fmt + """ + for name, fmt, start, stop in self._fields: + value = unpack(fmt, data[start:stop]) + if len(value) == 1: + value = value[0] + self._data[name] = value + + def read(self): + """Convert data to binary string + """ + result = [] + for name, fmt, start, stop in self._fields: + value = self._data[name] + if not isinstance(value, (tuple, list)): + value = (value,) + result.append(pack(fmt, *value)) + return ''.join(result) + + def __getattr__(self, name, default=_marker): + value = self._data.get(name, default) + if value is _marker: + raise AttributeError(name) + return value + + def __str__(self): + return self.read() + +_recordRegistry = RecordRegistry() +registerRecord = _recordRegistry.register +parseRecord = _recordRegistry.parse Copied: binaryfile/trunk/src/binaryfile/tests.py (from rev 204, pymoul/trunk/src/moul/crypt/tests/test_binary.py) =================================================================== --- binaryfile/trunk/src/binaryfile/tests.py (rev 0) +++ binaryfile/trunk/src/binaryfile/tests.py 2007-02-27 15:49:15 UTC (rev 208) @@ -0,0 +1,135 @@ +# Copyright (C) 2007 Christian Heimes <christian (at) cheimes (dot) de> +"""binaryfile unit tests +""" +__author__ = "Christian Heimes" +__version__ = "$Id$" +__revision__ = "$Revision$" + +import os +import unittest +from doctest import DocTestSuite +from tempfile import mkstemp + +from binary import BinaryFile + +class BinaryFileTest(unittest.TestCase): + def setUp(self): + self.tmpname = mkstemp()[1] + self.b = BinaryFile(self.tmpname, 'wb+') + + def tearDown(self): + self.b.close() + os.unlink(self.tmpname) + + def _testrw(self, name, data): + #import pdb; pdb.set_trace() + read = getattr(self.b, 'read%s' % name) + write = getattr(self.b, 'write%s' % name) + write(data) + self.b.seek(0) + fdata = read() + self.failUnlessEqual(data, fdata) + + def test_char(self): + self._testrw('Char', 'a') + + def test_byte(self): + self._testrw('Byte', 127) + + def test_bool(self): + self._testrw('Bool', True) + + def test_8(self): + self._testrw('8', 42) + + def test_8s(self): + self._testrw('8s', -42) + + def test_16(self): + self._testrw('16', 2**15) + + def test_16s(self): + self._testrw('16s', -2**15) + + def test_32(self): + self._testrw('32', 2*31) + + def test_32s(self): + self._testrw('32s', -2*31) + + def test_64(self): + self._testrw('64', 2*63) + + def test_64s(self): + self._testrw('64s', -2*63) + + def test_float(self): + data = -0.07 + self.b.writeFloat(data) + self.b.seek(0) + self.failUnlessAlmostEqual(data, self.b.readFloat()) + + def test_double(self): + self._testrw('Double', -23*10e200) + + def test_quad(self): + data = (23, 42) + self.b.writeQuad(data) + self.b.seek(0) + self.failUnlessEqual(data, self.b.readQuad()) + + def test_urustring(self): + # XXX: no test data + pass + + def test_string0(self): + s = "a test string" + l = len(s) + self.b.writeString0(s) + self.b.seek(0) + self.failUnlessEqual(self.b.size(), l+1) + self.failUnlessEqual(self.b.readString0(l+1), s) + self.b.seek(0) + self.failUnlessEqual(self.b.read(), s+'\x00') + + def test_string16(self): + s = "a test string" + l = len(s) + self.b.writeString16(s, terminate=False) + self.b.seek(0) + self.failUnlessEqual(self.b.size(), l+2) + self.failUnlessEqual(self.b.readString16(terminate=False), s) + + self.b.seek(0) + self.b.truncate(0) + self.b.writeString16(s, terminate=True) + self.b.seek(0) + self.failUnlessEqual(self.b.size(), l+3) + self.failUnlessEqual(self.b.readString16(terminate=True), s) + + def test_string32(self): + s = "a test string" + l = len(s) + self.b.writeString32(s, terminate=False) + self.b.seek(0) + self.failUnlessEqual(self.b.size(), l+4) + self.failUnlessEqual(self.b.readString32(terminate=False), s) + + self.b.seek(0) + self.b.truncate(0) + self.b.writeString32(s, terminate=True) + self.b.seek(0) + self.failUnlessEqual(self.b.size(), l+5) + self.failUnlessEqual(self.b.readString32(terminate=True), s) + + +def test_suite(): + return unittest.TestSuite(( + unittest.makeSuite(BinaryFileTest), + DocTestSuite('binary'), + DocTestSuite('binaryrecord'), + )) + +if __name__ == '__main__': + unittest.main(defaultTest="test_suite") + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |