From: SVN c. m. f. t. SWORD-A. p. <swo...@li...> - 2012-01-22 17:09:31
|
Revision: 458 http://sword-app.svn.sourceforge.net/sword-app/?rev=458&view=rev Author: richard-jones Date: 2012-01-22 17:09:24 +0000 (Sun, 22 Jan 2012) Log Message: ----------- add EntryDocument load from xml string, to aid in server-side interpretation of incoming atom entries. Also add tests for this object Modified Paths: -------------- sss/branches/sss-2/sss/core.py sss/branches/sss-2/sss/spec.py Added Paths: ----------- sss/branches/sss-2/tests/functional/test_entry.py Modified: sss/branches/sss-2/sss/core.py =================================================================== --- sss/branches/sss-2/sss/core.py 2012-01-20 10:54:06 UTC (rev 457) +++ sss/branches/sss-2/sss/core.py 2012-01-22 17:09:24 UTC (rev 458) @@ -1,4 +1,4 @@ -import web, os, base64 +import web, os, base64, uuid from lxml import etree from datetime import datetime from spec import Namespaces, HttpHeaders, Errors @@ -145,12 +145,14 @@ def __init__(self, atom_id=None, alternate_uri=None, content_uri=None, edit_uri=None, se_uri=None, em_uris=[], packaging=[], state_uris=[], updated=None, dc_metadata={}, generator=("http://www.swordapp.org/sss", __version__), - verbose_description=None, treatment=None, original_deposit_uri=None, derived_resource_uris=[], nsmap=None): + verbose_description=None, treatment=None, original_deposit_uri=None, derived_resource_uris=[], nsmap=None, + xml_source=None): self.ns = Namespaces() self.drmap = {None: self.ns.ATOM_NS, "sword" : self.ns.SWORD_NS, "dcterms" : self.ns.DC_NS} if nsmap is not None: self.drmap = nsmap + self.other_metadata = {} self.dc_metadata = dc_metadata self.atom_id = atom_id if atom_id is not None else "urn:uuid:" + str(uuid.uuid4()) self.updated = updated if updated is not None else datetime.now() @@ -166,7 +168,116 @@ self.state_uris = state_uris self.original_deposit_uri = original_deposit_uri self.derived_resource_uris = derived_resource_uris + + # we may have been passed the xml_source argument, in which case we want + # to load from a string + self.links = {} + self.dom = None + self.parsed = False + if xml_source is not None: + self._load(xml_source) + def _load(self, xml_source): + try: + self.dom = etree.fromstring(xml_source) + self.parsed = True + except Exception as e: + ssslog.error("Was not able to parse the Entry Document as XML.") + raise e + + if self.parsed: + for element in self.dom.getchildren(): + field = self._canonical_tag(element.tag) + ssslog.debug("Attempting to intepret field: '%s'" % field) + if field == "atom_id" and element.text is not None: + self.atom_id = element.text.strip() + elif field == "atom_updated" and element.text is not None: + try: + self.updated = datetime.strptime(element.text.strip(), "%Y-%m-%dT%H:%M:%SZ") + except Exception as e: + ssslog.info("Unable to parse updated time: " + element.text.strip()) + elif field == "atom_link": + self._handle_link(element) + elif field == "atom_content": + self._handle_content(element) + elif field == "atom_generator": + uri = element.attrib.get("uri") + version = element.attrib.get("version") + self.generator = (uri, version) + elif field == "sword_packaging" and element.text is not None: + self.packaging.append(element.text.strip()) + elif field == "sword_verboseDescription" and element.text is not None: + self.verbose_description = element.text.strip() + elif field == "sword_treatment" and element.text is not None: + self.treatment = element.text.strip() + elif field.startswith("dcterms_") and element.text is not None: + field = field[8:] # get rid of the dcterms_ prefix + if self.dc_metadata.has_key(field): + self.dc_metadata[field].append(element.text.strip()) + else: + self.dc_metadata[field] = [element.text.strip()] + else: + if element.text is not None: # handle empty elements + if self.other_metadata.has_key(field): + self.other_metadata[field].append(element.text.strip()) + else: + self.other_metadata[field] = [element.text.strip()] + + + def _canonical_tag(self, tag): + ns, field = tag.rsplit("}", 1) + prefix = self.ns.prefix.get(ns[1:], ns[1:]) + return prefix + "_" + field + + def _handle_link(self, e): + """Method that handles the intepreting of <atom:link> element information and placing it into the anticipated attributes.""" + # MUST have rel + rel = e.attrib.get('rel', None) + if rel: + if rel == "edit": + self.edit_uri = e.attrib.get('href', None) + elif rel == "edit-media": + # FIXME: need to better handle uris with types + self.em_uris.append((e.attrib.get('href', None), e.attrib.get("type", None))) + # only put the edit-media iri in the convenience attribute if + # there is no 'type' + #if not ('type' in e.attrib.keys()): + # self.edit_media = e.attrib.get('href', None) + #elif e.attrib['type'] == "application/atom+xml;type=feed": + # self.edit_media_feed = e.attrib.get('href', None) + elif rel == "http://purl.org/net/sword/terms/add": + self.se_uri = e.attrib.get('href', None) + elif rel == "alternate": + self.alternate_uri = e.attrib.get('href', None) + elif rel == "http://purl.org/net/sword/terms/statement": + self.state_uris.append((e.attrib.get('href', None), e.attrib.get("type", None))) + elif rel == "http://purl.org/net/sword/terms/originalDeposit": + self.original_deposit_uri = e.attrib.get("href", None) + elif rel == "http://purl.org/net/sword/terms/derivedResource": + # FIXME: doesn't handle types + self.derived_resource_uris.append(e.attrib.get("href", None)) + + # Put all links into .links attribute, with all element attribs + attribs = {} + for k,v in e.attrib.iteritems(): + if k != "rel": + attribs[k] = v + if self.links.has_key(rel): + self.links[rel].append(attribs) + else: + self.links[rel] = [attribs] + + + def _handle_content(self, e): + """Method to intepret the <atom:content> elements.""" + # eg <content type="application/zip" src="http://swordapp.org/cont-IRI/43/my_deposit"/> + if e.attrib.has_key("src"): + src = e.attrib['src'] + info = dict(e.attrib).copy() + del info['src'] + #self.content[src] = info # FIXME: this class isn't generic enough yet to do this + self.content_uri = src + def serialise(self): # the main entry document room entry = etree.Element(self.ns.ATOM + "entry", nsmap=self.drmap) @@ -202,6 +313,12 @@ # now embed all the metadata as foreign markup for field in self.dc_metadata.keys(): + # ensure it's a list (common mistake) + if not isinstance(self.dc_metadata[field], list): + self.dc_metadata[field] = [self.dc_metadata[field]] + if field.startswith("dcterms_"): + # a potentially common mistake? + field = field[8:] for v in self.dc_metadata[field]: fdc = etree.SubElement(entry, self.ns.DC + field) fdc.text = v @@ -265,6 +382,7 @@ od.set("rel", "http://purl.org/net/sword/terms/originalDeposit") od.set("href", self.original_deposit_uri) + # FIXME: doesn't handle types # Derived Resources if self.derived_resource_uris is not None: for uri in self.derived_resource_uris: Modified: sss/branches/sss-2/sss/spec.py =================================================================== --- sss/branches/sss-2/sss/spec.py 2012-01-20 10:54:06 UTC (rev 457) +++ sss/branches/sss-2/sss/spec.py 2012-01-22 17:09:24 UTC (rev 458) @@ -4,6 +4,7 @@ from sss_logging import logging ssslog = logging.getLogger(__name__) +# FIXME: this is a poorly constructed object class Namespaces(object): """ This class encapsulates all the namespace declarations that we will need @@ -12,31 +13,49 @@ # AtomPub namespace and lxml format self.APP_NS = "http://www.w3.org/2007/app" self.APP = "{%s}" % self.APP_NS + self.APP_PREFIX = "app" # Atom namespace and lxml format self.ATOM_NS = "http://www.w3.org/2005/Atom" self.ATOM = "{%s}" % self.ATOM_NS + self.ATOM_PREFIX = "atom" # SWORD namespace and lxml format self.SWORD_NS = "http://purl.org/net/sword/terms/" self.SWORD = "{%s}" % self.SWORD_NS + self.SWORD_PREFIX = "sword" # Dublin Core namespace and lxml format self.DC_NS = "http://purl.org/dc/terms/" self.DC = "{%s}" % self.DC_NS + self.DC_PREFIX = "dcterms" # RDF namespace and lxml format self.RDF_NS = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" self.RDF = "{%s}" % self.RDF_NS + self.RDF_PREFIX = "rdf" # ORE namespace and lxml format self.ORE_NS = "http://www.openarchives.org/ore/terms/" self.ORE = "{%s}" % self.ORE_NS + self.ORE_PREFIX = "ore" # ORE ATOM self.ORE_ATOM_NS = "http://www.openarchives.org/ore/atom/" self.ORE_ATOM = "{%s}" % self.ORE_ATOM_NS + self.ORE_ATOM_PREFIX = "oreatom" + # lookup dictionary + self.prefix = { + self.APP_NS : self.APP_PREFIX, + self.ATOM_NS : self.ATOM_PREFIX, + self.SWORD_NS : self.SWORD_PREFIX, + self.DC_NS : self.DC_PREFIX, + self.RDF_NS : self.RDF_PREFIX, + self.ORE_NS : self.ORE_PREFIX, + self.ORE_ATOM_NS : self.ORE_ATOM_PREFIX + } + class Errors(object): content = "http://purl.org/net/sword/error/ErrorContent" checksum_mismatch = "http://purl.org/net/sword/error/ErrorChecksumMismatch" Added: sss/branches/sss-2/tests/functional/test_entry.py =================================================================== --- sss/branches/sss-2/tests/functional/test_entry.py (rev 0) +++ sss/branches/sss-2/tests/functional/test_entry.py 2012-01-22 17:09:24 UTC (rev 458) @@ -0,0 +1,301 @@ +from . import TestController + +from datetime import datetime +from lxml import etree + +from sss import EntryDocument + +ATOM = "{http://www.w3.org/2005/Atom}" +SWORD = "{http://purl.org/net/sword/terms/}" +DC = "{http://purl.org/dc/terms/}" + +class TestConnection(TestController): + def test_01_blank_init(self): + e = EntryDocument() + + # check the meaningful default values + assert e.atom_id is not None + assert e.updated is not None + + g, v = e.generator + assert g == "http://www.swordapp.org/sss" + assert v is not None + + # check a couple of other things for emptyness + assert e.other_metadata is not None + assert len(e.other_metadata) == 0 + assert e.dc_metadata is not None + assert len(e.dc_metadata) == 0 + + def test_02_args_init(self): + + e = EntryDocument( + atom_id = "1234", + alternate_uri = "http://alternate/", + content_uri = "http://content/", + edit_uri = "http://edit/", + se_uri = "http://sword-edit/", + em_uris = [ + ("http://edit-media/1", "application/atom+xml"), + ("http://edit-media/2", "application/zip") + ], + packaging = ["http://packaging/"], + state_uris = [ + ("http://state/1", "application/atom+xml"), + ("http://state/2", "application/rdf+xml") + ], + updated = datetime.now(), + dc_metadata = { + "identifier" : "http://identifier/", + "rights" : "you can do this!", + "replaces" : "something else" + }, + verbose_description = "Verbose Description", + treatment = "Treatment", + original_deposit_uri = "http://original/", + derived_resource_uris = ["http://derived/1", "http://derived/2"] + ) + + assert e.atom_id == "1234" + assert e.alternate_uri == "http://alternate/" + assert e.content_uri == "http://content/" + assert e.edit_uri == "http://edit/" + assert e.se_uri == "http://sword-edit/" + assert len(e.em_uris) == 2 + assert "http://edit-media/1" in e.em_uris[0] + assert "application/zip" in e.em_uris[1] + assert len(e.packaging) == 1 + assert "http://packaging/" in e.packaging + assert len(e.state_uris) == 2 + assert "application/atom+xml" in e.state_uris[0] + assert "http://state/2" in e.state_uris[1] + assert e.updated is not None + assert len(e.dc_metadata) == 3 + assert "identifier" in e.dc_metadata.keys() + assert e.verbose_description == "Verbose Description" + assert e.treatment == "Treatment" + assert e.original_deposit_uri == "http://original/" + assert len(e.derived_resource_uris) == 2 + + def test_03_serialise(self): + e = EntryDocument( + atom_id = "1234", + alternate_uri = "http://alternate/", + content_uri = "http://content/", + edit_uri = "http://edit/", + se_uri = "http://sword-edit/", + em_uris = [ + ("http://edit-media/1", "application/atom+xml"), + ("http://edit-media/2", "application/zip") + ], + packaging = ["http://packaging/"], + state_uris = [ + ("http://state/1", "application/atom+xml"), + ("http://state/2", "application/rdf+xml") + ], + updated = datetime.now(), + dc_metadata = { + "identifier" : "http://identifier/", + "rights" : "you can do this!", + "replaces" : "something else" + }, + verbose_description = "Verbose Description", + treatment = "Treatment", + original_deposit_uri = "http://original/", + derived_resource_uris = ["http://derived/1", "http://derived/2"] + ) + + s = e.serialise() + + # does it parse as xml + xml = etree.fromstring(s) + + # now check the xml document and see if it ties in with the above + # attributes + has_id = False + has_alt = False + has_cont = False + has_edit = False + has_se = False + has_em_atom = False + has_em_zip = False + has_packaging = False + has_state_atom = False + has_state_rdf = False + has_updated = False + dc_count = 0 + has_vd = False + has_treatment = False + has_od = False + dr_count = 0 + for element in xml.getchildren(): + if element.tag == ATOM + "id": + assert element.text.strip() == "1234" + has_id = True + elif element.tag == ATOM + "content": + src = element.attrib.get("src") + assert src == "http://content/" + has_cont = True + elif element.tag == SWORD + "packaging": + assert element.text.strip() == "http://packaging/" + has_packaging = True + elif element.tag == ATOM + "updated": + has_updated = True + elif element.tag == DC + "identifier": + assert element.text.strip() == "http://identifier/" + dc_count += 1 + elif element.tag == DC + "rights": + assert element.text.strip() == "you can do this!" + dc_count += 1 + elif element.tag == DC + "replaces": + assert element.text.strip() == "something else" + dc_count += 1 + elif element.tag == SWORD + "verboseDescription": + assert element.text.strip() == "Verbose Description" + has_vd = True + elif element.tag == SWORD + "treatment": + assert element.text.strip() == "Treatment" + has_treatment = True + elif element.tag == ATOM + "link": + rel = element.attrib.get("rel") + if rel == "alternate": + assert element.attrib.get("href") == "http://alternate/" + has_alt = True + elif rel == "edit": + assert element.attrib.get("href") == "http://edit/" + has_edit = True + elif rel == "http://purl.org/net/sword/terms/add": + assert element.attrib.get("href") == "http://sword-edit/" + has_se= True + elif rel == "edit-media": + t = element.attrib.get("type") + if t == "application/atom+xml": + assert element.attrib.get("href") == "http://edit-media/1" + has_em_atom = True + elif t == "application/zip": + assert element.attrib.get("href") == "http://edit-media/2" + has_em_zip = True + else: + assert False + elif rel == "http://purl.org/net/sword/terms/statement": + t = element.attrib.get("type") + if t == "application/atom+xml": + assert element.attrib.get("href") == "http://state/1" + has_state_atom = True + elif t == "application/rdf+xml": + assert element.attrib.get("href") == "http://state/2" + has_state_rdf = True + else: + assert False + elif rel == "http://purl.org/net/sword/terms/originalDeposit": + assert element.attrib.get("href") == "http://original/" + has_od = True + elif rel == "http://purl.org/net/sword/terms/derivedResource": + assert element.attrib.get("href") in ["http://derived/1", "http://derived/2"] + dr_count += 1 + + # now check all our switches were appropriately thrown + assert has_id + assert has_alt + assert has_cont + assert has_edit + assert has_se + assert has_em_atom + assert has_em_zip + assert has_packaging + assert has_state_atom + assert has_state_rdf + assert has_updated + assert dc_count == 3 + assert has_vd + assert has_treatment + assert has_od + assert dr_count == 2 + + def test_04_round_trip_load(self): + e1 = EntryDocument( + atom_id = "1234", + alternate_uri = "http://alternate/", + content_uri = "http://content/", + edit_uri = "http://edit/", + se_uri = "http://sword-edit/", + em_uris = [ + ("http://edit-media/1", "application/atom+xml"), + ("http://edit-media/2", "application/zip") + ], + packaging = ["http://packaging/"], + state_uris = [ + ("http://state/1", "application/atom+xml"), + ("http://state/2", "application/rdf+xml") + ], + updated = datetime.now(), + dc_metadata = { + "identifier" : "http://identifier/", + "rights" : "you can do this!", + "replaces" : "something else" + }, + verbose_description = "Verbose Description", + treatment = "Treatment", + original_deposit_uri = "http://original/", + derived_resource_uris = ["http://derived/1", "http://derived/2"] + ) + + s = e1.serialise() + + # now create a new entry from the output + e = EntryDocument(xml_source=s) + + assert e.atom_id == "1234" + assert e.alternate_uri == "http://alternate/" + assert e.content_uri == "http://content/" + assert e.edit_uri == "http://edit/" + assert e.se_uri == "http://sword-edit/" + assert len(e.em_uris) == 2 + assert "http://edit-media/1" in e.em_uris[0] + assert "application/zip" in e.em_uris[1] + assert len(e.packaging) == 1 + assert "http://packaging/" in e.packaging + assert len(e.state_uris) == 2 + assert "application/atom+xml" in e.state_uris[0] + assert "http://state/2" in e.state_uris[1] + assert e.updated is not None + assert len(e.dc_metadata) == 3 + assert "identifier" in e.dc_metadata.keys() + assert e.verbose_description == "Verbose Description" + assert e.treatment == "Treatment" + assert e.original_deposit_uri == "http://original/" + assert len(e.derived_resource_uris) == 2 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |