[Pymoul-svn] SF.net SVN: pymoul: [158] pymoul/trunk/src/moul/crypt
Status: Alpha
Brought to you by:
tiran
|
From: <ti...@us...> - 2007-02-10 16:01:33
|
Revision: 158
http://pymoul.svn.sourceforge.net/pymoul/?rev=158&view=rev
Author: tiran
Date: 2007-02-10 08:01:28 -0800 (Sat, 10 Feb 2007)
Log Message:
-----------
Added binary record
Modified Paths:
--------------
pymoul/trunk/src/moul/crypt/binary.py
Added Paths:
-----------
pymoul/trunk/src/moul/crypt/binaryrecord.py
pymoul/trunk/src/moul/crypt/tests/test_binaryrecord.py
Modified: pymoul/trunk/src/moul/crypt/binary.py
===================================================================
--- pymoul/trunk/src/moul/crypt/binary.py 2007-02-10 13:14:18 UTC (rev 157)
+++ pymoul/trunk/src/moul/crypt/binary.py 2007-02-10 16:01:28 UTC (rev 158)
@@ -25,6 +25,9 @@
from struct import unpack
from struct import calcsize
+from moul.crypt.binaryrecord import parseRecord
+from moul.crypt.binaryrecord import registerRecord
+
class BinaryFile(file):
"""Binary file
@@ -136,6 +139,13 @@
def readString32(self, terminate=False):
return String32('', terminate=terminate).readfd(self)
+ def readRecord(self, name):
+ return parseRecord(name, self)
+
+ @staticmethod
+ def registerRecord(name, fmt):
+ return registerRecord(name, fmt)
+
#write
def writeChar(self, data, _pack=pack):
self.write(_pack('<c', data))
@@ -198,6 +208,9 @@
def writeString32(self, data, terminate=False):
String32(data, terminate=terminate).writefd(self)
+ def writeRecord(self, rec):
+ self.write(rec.read())
+
class AbstractString(object):
"""Abstract string class
"""
Added: pymoul/trunk/src/moul/crypt/binaryrecord.py
===================================================================
--- pymoul/trunk/src/moul/crypt/binaryrecord.py (rev 0)
+++ pymoul/trunk/src/moul/crypt/binaryrecord.py 2007-02-10 16:01:28 UTC (rev 158)
@@ -0,0 +1,170 @@
+# pyMoul - Python interface to Myst Online URU Live
+# Copyright (C) 2007 Christian Heimes <christian (at) cheimes (dot) de>
+
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc., 59
+# Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+"""Binary file helper: Records
+
+This module is roughly based on Maciej Obarski's recipe "parse and create
+fixed size binary data" from
+http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/465219
+"""
+__author__ = "Christian Heimes"
+__version__ = "$Id"
+__revision__ = "$Revision"
+
+from struct import pack
+from struct import unpack
+from struct import calcsize
+
+_marker = object()
+
+class RecordRegistry(dict):
+ """Registry for record definitions
+ """
+ __slots__ = ()
+ def register(self, name, fmt):
+ """Register a format by name
+
+ @param name: name of the format
+ @type name: str
+ @param fmt: a record format
+ @type fmt: str
+
+ Example:
+ >>> reg = RecordRegistry()
+ >>> registerRecord = reg.register
+ >>> parseRecord = reg.parse
+ >>> obj = registerRecord("connection", "4B.ip >H.port >I.session_id")
+ >>> isinstance(obj, RecordDefinition)
+ True
+ >>> data = "\\xc0\\xa8\\x00\\x01" + "\\x00P" + "\\xFE\\xDC\\xBA\\x98"
+
+ >>> rec = parseRecord("connection", data)
+ >>> rec.ip
+ (192, 168, 0, 1)
+ >>> rec.port
+ 80
+ >>> rec.session_id
+ 4275878552L
+ >>> rec.read() == data or rec.read()
+ True
+ """
+ if name in self:
+ raise NameError("%s already registered!" % name)
+ self[name] = RecordDefinition(name, fmt)
+ return self[name]
+
+ def parse(self, name, fd_data):
+ """Parse data using the RecordDefinition 'name'
+
+ @param name: name of the format
+ @type name: str
+ @param fd_data: data to parse: either a string or an open file
+ @type fd_data: str or file
+ """
+ return self[name](fd_data)
+
+class RecordDefinition(object):
+ """A record definition
+ """
+ __slots__ = ('_fields', '_recordsize', '_name')
+
+ def __init__(self, name, recordfmt):
+ self._name = name
+ self._fields = []
+ pos = 0
+ for field in recordfmt.split():
+ if field.startswith('#'):
+ continue
+ fmt, name = field.split('.')
+ if '#' in name:
+ name = name.split('#')[0]
+ name = name.strip()
+ size = calcsize(fmt)
+ self._fields.append((name, fmt, pos, pos+size))
+ pos += size
+
+ self._recordsize = pos
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def size(self):
+ return self._recordsize
+
+ def __call__(self, fd_data):
+ """Parse data using the format string
+
+ @param fd_data: data to parse: either a string or an open file
+ @type fd_data: str or file
+ """
+ if isinstance(fd_data, basestring):
+ # handle string
+ data = fd_data
+ elif hasattr(fd_data, 'read'):
+ data = fd_data.read(self._recordsize)
+ else:
+ raise TypeError(type(fd_data))
+ if len(data) != self._recordsize:
+ raise ValueError("Data has wrong size: %i, required: %i" %
+ (len(data), self._recordsize))
+ return Record(self._fields, data)
+
+class Record(object):
+ __slots__ = ('_fields', '_data')
+
+ def __init__(self, fields, data=None):
+ self._fields = fields
+ self._data = {}
+ if data is not None:
+ self.write(data)
+
+ def write(self, data):
+ """Write data
+
+ Creates the instance attributes defined in fmt
+ """
+ for name, fmt, start, stop in self._fields:
+ value = unpack(fmt, data[start:stop])
+ if len(value) == 1:
+ value = value[0]
+ self._data[name] = value
+
+ def read(self):
+ """Convert data to binary string
+ """
+ result = []
+ for name, fmt, start, stop in self._fields:
+ value = self._data[name]
+ if not isinstance(value, (tuple, list)):
+ value = (value,)
+ result.append(pack(fmt, *value))
+ return ''.join(result)
+
+ def __getattr__(self, name, default=_marker):
+ value = self._data.get(name, default)
+ if value is _marker:
+ raise AttributeError(name)
+ return value
+
+ def __str__(self):
+ return self.read()
+
+_recordRegistry = RecordRegistry()
+registerRecord = _recordRegistry.register
+parseRecord = _recordRegistry.parse
Added: pymoul/trunk/src/moul/crypt/tests/test_binaryrecord.py
===================================================================
--- pymoul/trunk/src/moul/crypt/tests/test_binaryrecord.py (rev 0)
+++ pymoul/trunk/src/moul/crypt/tests/test_binaryrecord.py 2007-02-10 16:01:28 UTC (rev 158)
@@ -0,0 +1,38 @@
+# pyMoul - Python interface to Myst Online URU Live
+# Copyright (C) 2007 Christian Heimes <christian (at) cheimes (dot) de>
+
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc., 59
+# Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+"""moul.crypt.binaryrecord unit tests
+"""
+__author__ = "Christian Heimes"
+__version__ = "$Id: test_elf.py 122 2007-02-02 17:34:06Z tiran $"
+__revision__ = "$Revision: 122 $"
+
+import os
+import unittest
+from doctest import DocTestSuite
+
+import moul.crypt.binaryrecord
+
+def test_suite():
+ return unittest.TestSuite((
+ DocTestSuite('moul.crypt.binaryrecord'),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest="test_suite")
+
+
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|