[Gug-cvs] gug/gug zsi.py,NONE,1.1
Status: Planning
Brought to you by:
szferi
From: Nagy Z. <zs...@us...> - 2007-05-08 13:56:28
|
Update of /cvsroot/gug/gug/gug In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv10433/gug Added Files: zsi.py Log Message: gug.zsi.MyAny class instead of patching ZSI for ZSI-2.0 --- NEW FILE: zsi.py --- from ZSI import TC from ZSI import _copyright, _children, _child_elements, \ _floattypes, _stringtypes, _seqtypes, _find_attr, _find_attrNS, _find_attrNodeNS, \ _find_arraytype, _find_default_namespace, _find_href, _find_encstyle, \ _resolve_prefix, _find_xsi_attr, _find_type, \ _find_xmlns_prefix, _get_element_nsuri_name, _get_idstr, \ _Node, EvaluateException, \ _valid_encoding, ParseException from ZSI.wstools.logging import getLogger as _GetLogger from ZSI.wstools.Namespaces import SCHEMA, SOAP import types, time from ZSI.TC import Nilled, _is_xsd_or_soap_ns from gug.common import log class MyAny(TC.Any): '''When the type isn't defined in the schema, but must be specified in the incoming operation. parsemap -- a type to class mapping (updated by descendants), for parsing serialmap -- same, for (outgoing) serialization ''' logger = _GetLogger('gug.zsi.MyAny') def __init__(self, pname=None, aslist=False, minOccurs=0, **kw): TC.Any.__init__(self, pname, aslist, minOccurs, **kw) def parse(self, elt, ps): (ns,type) = self.checkname(elt, ps) if not type and self.nilled(elt, ps): return Nilled if len(_children(elt)) == 0: href = _find_href(elt) if not href: if self.minOccurs < 1: if _is_xsd_or_soap_ns(ns): parser = MyAny.parsemap.get((None,type)) if parser: return parser.parse(elt, ps) if ((ns,type) == (SOAP.ENC,'Array') or (_find_arraytype(elt) or '').endswith('[0]')): return [] if not type: return {} return None raise EvaluateException('Required Any missing', ps.Backtrace(elt)) elt = ps.FindLocalHREF(href, elt) (ns,type) = self.checktype(elt, ps) if not type and elt.namespaceURI == SOAP.ENC: ns,type = SOAP.ENC, elt.localName if not type or (ns,type) == (SOAP.ENC,'Array'): if self.aslist or _find_arraytype(elt): return [ self.__class__(**self.kwargs).parse(e, ps) for e in _child_elements(elt) ] if len(_child_elements(elt)) == 0: #raise EvaluateException("Any cannot parse untyped element", # ps.Backtrace(elt)) return self.simple_value(elt, ps) return self.parse_into_dict_or_list(elt, ps) parser = MyAny.parsemap.get((ns,type)) if not parser and _is_xsd_or_soap_ns(ns): parser = MyAny.parsemap.get((None,type)) if not parser: raise EvaluateException('''gug.zsi.MyAny can't parse element''', ps.Backtrace(elt)) return parser.parse(elt, ps) def serialize(self, elt, sw, pyobj, name=None, **kw): if hasattr(pyobj, 'typecode') and pyobj.typecode is not self: pyobj.typecode.serialize(elt, sw, pyobj, **kw) return objid = _get_idstr(pyobj) ns,n = self.get_name(name, objid) kw.setdefault('typed', self.typed) tc = type(pyobj) self.logger.debug('gug.zsi.MyAny serialize -- %s', tc) if tc in _seqtypes: if True: # self.aslist: array = elt.createAppendElement(ns, n) array.setAttributeType(SOAP.ENC, "Array") array.setAttributeNS(self.nspname, 'SOAP-ENC:arrayType', "xsd:anyType[" + str(len(pyobj)) + "]" ) for o in pyobj: #TODO maybe this should take **self.kwargs... serializer = getattr(o, 'typecode', self.__class__()) # also used by _AnyLax() serializer.serialize(array, sw, o, name='element', **kw) else: struct = elt.createAppendElement(ns, n) for o in pyobj: #TODO maybe this should take **self.kwargs... serializer = getattr(o, 'typecode', self.__class__()) # also used by _AnyLax() serializer.serialize(struct, sw, o, **kw) return kw['name'] = (ns,n) if tc == types.DictType: el = elt.createAppendElement(ns, n) parentNspname = self.nspname # temporarily clear nspname for dict elements self.nspname = None for o,m in pyobj.items(): if type(o) != types.StringType and type(o) != types.UnicodeType: raise Exception, 'Dictionary implementation requires keys to be of type string (or unicode).' %pyobj kw['name'] = o kw.setdefault('typed', True) self.serialize(el, sw, m, **kw) # restore nspname self.nspname = parentNspname return if tc == types.InstanceType: tc = pyobj.__class__ if hasattr(pyobj, 'typecode'): #serializer = pyobj.typecode.serialmap.get(tc) serializer = pyobj.typecode else: serializer = MyAny.serialmap.get(tc) if not serializer: tc = (types.ClassType, pyobj.__class__.__name__) serializer = MyAny.serialmap.get(tc) else: serializer = MyAny.serialmap.get(tc) if not serializer and isinstance(pyobj, time.struct_time): from ZSI.TCtimes import gDateTime serializer = gDateTime() if not serializer: # Last-chance; serialize instances as dictionary if pyobj is None: self.serialize_as_nil(elt.createAppendElement(ns, n)) elif type(pyobj) != types.InstanceType: raise EvaluateException('''gug.zsi.MyAny can't serialize ''' + \ repr(pyobj)) else: self.serialize(elt, sw, pyobj.__dict__, **kw) else: # Try to make the element name self-describing tag = getattr(serializer, 'tag', None) if self.pname is not None: #serializer.nspname = self.nspname #serializer.pname = self.pname if "typed" not in kw: kw['typed'] = False elif tag: if tag.find(':') == -1: tag = 'SOAP-ENC:' + tag kw['name'] = tag kw['typed'] = False serializer.unique = self.unique serializer.serialize(elt, sw, pyobj, **kw) # Reset TypeCode #serializer.nspname = None #serializer.pname = None |