From: Alec M. <svn...@pl...> - 2008-01-13 20:51:12
|
Author: alecm Date: Sun Jan 13 20:51:17 2008 New Revision: 18932 Added: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/interfaces/workflow.py Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/workflow.py Modified: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/WorkflowTool.py Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/configure.zcml Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/interfaces/__init__.py Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/dummy.py Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/testWorkflowTool.py Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/test_doctests.py Log: Initial implementation and tests for PLIP 217 Modified: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/WorkflowTool.py ============================================================================== --- Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/WorkflowTool.py (original) +++ Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/WorkflowTool.py Sun Jan 13 20:51:17 2008 @@ -1,9 +1,11 @@ +from zope.component import getMultiAdapter from Products.CMFCore.interfaces import IConfigurableWorkflowTool from Products.CMFCore.utils import getToolByName from Products.CMFCore.WorkflowTool import WorkflowTool as BaseTool from Products.CMFPlone import ToolNames from Products.CMFPlone.utils import base_hasattr +from Products.CMFPlone.interfaces import IWorkflowChain from ZODB.POSException import ConflictError from Acquisition import aq_base, aq_parent, aq_inner @@ -13,10 +15,6 @@ from Products.DCWorkflow.Transitions import TRIGGER_USER_ACTION from Products.CMFPlone.PloneBaseTool import PloneBaseTool -try: - from Products.CMFPlacefulWorkflow.PlacefulWorkflowTool import WorkflowPolicyConfig_id -except: - WorkflowPolicyConfig_id = '.wf_policy_config' class WorkflowTool(PloneBaseTool, BaseTool): @@ -250,89 +248,6 @@ # Return the default chain. return self._default_chain - security.declarePrivate('getChainFor') - def getChainFor(self, ob): - """ Get the chain that applies to the given object. - - Goal: find a workflow chain in a policy - - Steps: - 1. ask the object if it contains a policy - 2. if it does, ask him for a chain - 3. if there's no chain for the type the we loop on the parent - 4. if the parent is the portal object or None we stop and we ask to portal_workflow - - Hint: - If ob was a string, ask directly portal_worlfow\n\n - """ - - cbt = self._chains_by_type - chain = None - - if type(ob) == type(''): - # We are not in an object, then we can only get default from portal_workflow - portal_type = ob - if cbt is not None: - chain = cbt.get(portal_type, None) - # Note that if chain is not in cbt or has a value of None, we use a default chain. - if chain is None: - chain = self.getDefaultChainFor(ob) - if chain is None: - # CMFCore default - return () - - elif hasattr(aq_base(ob), '_getPortalTypeName'): - portal_type = ob._getPortalTypeName() - else: - portal_type = None - - if portal_type is None or ob is None: - return () - - # Take some extra care when ob is a string - is_policy_container = False - objectids = [] - try: - objectids = ob.objectIds() - except AttributeError, TypeError: - pass - if WorkflowPolicyConfig_id in objectids: - is_policy_container = True - - # Inspired by implementation in CPSWorkflowTool.py of CPSCore 3.9.0 - # Workflow needs to be determined by true containment not context - # so we loop over the actual containers - chain = None - wfpolicyconfig = None - current_ob = aq_inner(ob) - # start_here is used to check 'In policy': We check it only in the first folder - start_here = True - portal = aq_base(getToolByName(self, 'portal_url').getPortalObject()) - while chain is None and current_ob is not None: - if base_hasattr(current_ob, WorkflowPolicyConfig_id): - wfpolicyconfig = getattr(current_ob, WorkflowPolicyConfig_id) - chain = wfpolicyconfig.getPlacefulChainFor(portal_type, start_here=start_here) - if chain is not None: - return chain - - elif aq_base(current_ob) is portal: - break - start_here = False - current_ob = aq_inner(aq_parent(current_ob)) - - # Note that if chain is not in cbt or has a value of None, we use a default chain. - if cbt is not None: - chain = cbt.get(portal_type, None) - # Note that if chain is not in cbt or has a value of - # None, we use a default chain. - if chain is None: - chain = self.getDefaultChainFor(ob) - if chain is None: - # CMFCore default - return () - - return chain - security.declareProtected(ManagePortal, 'listWorkflows') def listWorkflows(self): """ Return the list of workflows @@ -390,6 +305,15 @@ dup_list[key] = 1 return [(s.title, s.getId()) for s in states] + # PLIP 217 Workflow by adaptation + def getChainFor( self, ob ): + """ + Returns the chain that applies to the given object. + If we get a string as the ob parameter, use it as + the portal_type. + """ + return getMultiAdapter( (ob, self), IWorkflowChain ) + WorkflowTool.__doc__ = BaseTool.__doc__ InitializeClass(WorkflowTool) Modified: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/configure.zcml ============================================================================== --- Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/configure.zcml (original) +++ Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/configure.zcml Sun Jan 13 20:51:17 2008 @@ -74,4 +74,10 @@ provides="plone.app.content.interfaces.IIndexableObjectWrapper" factory=".CatalogTool.ExtensibleIndexableObjectWrapper" /> + <!-- Adapter for default workflow lookup --> + <adapter + factory=".workflow.ToolWorkflowChain" + trusted="y" + /> + </configure> Modified: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/interfaces/__init__.py ============================================================================== --- Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/interfaces/__init__.py (original) +++ Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/interfaces/__init__.py Sun Jan 13 20:51:17 2008 @@ -21,6 +21,7 @@ from factory import IFactoryTool from translationservice import ITranslationServiceTool from breadcrumbs import IHideFromBreadcrumbs +from workflow import IWorkflowChain import ConstrainTypes import NonStructuralFolder Added: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/interfaces/workflow.py ============================================================================== --- (empty file) +++ Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/interfaces/workflow.py Sun Jan 13 20:51:17 2008 @@ -0,0 +1,5 @@ +from zope.interface.common.sequence import IReadSequence + +class IWorkflowChain( IReadSequence ): + """ an interface denoting the cmf workflow name sequence + """ Modified: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/dummy.py ============================================================================== --- Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/dummy.py (original) +++ Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/dummy.py Sun Jan 13 20:51:17 2008 @@ -6,10 +6,11 @@ import os -from zope.interface import Interface, implements +from zope.interface import Interface, implements, implementer from zope.interface import Interface from Products.CMFPlone.interfaces import INonStructuralFolder +from Products.CMFPlone.interfaces import IWorkflowChain from ComputedAttribute import ComputedAttribute from OFS.SimpleItem import SimpleItem @@ -193,3 +194,28 @@ def disallow_delete_handler(obj, event): obj.delete_attempted = True raise Exception, "You can't delete this!" + + +class DummyContent(Dummy): + """Dummy DynamicType object""" + + def getPortalTypeName(self): + return getattr(self, 'portal_type') + +class DummyWorkflowTool(object): + """A dummy workflow tool for testing adaptation based workflow""" + + def __init__(self, id='portal_workflow'): + self._chains_by_type = {} + + def setChainForPortalTypes(self, types, chain): + for ptype in types: + self._chains_by_type[ptype] = chain + + def getDefaultChainFor(self, context): + return ('Default Workflow',) + +@implementer(IWorkflowChain) +def DummyWorkflowChainAdapter(context, tool): + """A dummy adapter to IWorkflowChain""" + return ('Static Workflow',) Modified: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/testWorkflowTool.py ============================================================================== --- Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/testWorkflowTool.py (original) +++ Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/testWorkflowTool.py Sun Jan 13 20:51:17 2008 @@ -2,7 +2,13 @@ # Tests the workflow tool # +from zope.interface import directlyProvides +from zope.component import provideAdapter, getGlobalSiteManager + from Products.CMFPlone.tests import PloneTestCase +from Products.CMFPlone.tests.dummy import Dummy, DummyWorkflowChainAdapter +from Products.CMFDefault.interfaces import IDocument +from Products.CMFCore.interfaces import IWorkflowTool default_user = PloneTestCase.default_user @@ -107,6 +113,19 @@ self.assertEqual(len(internal_pub_states), all_states.count('internally_published')) + def testAdaptationBasedWorkflowOverride(self): + # We take a piece of dummy content and register a dummy + # workflow chain adapter for it. + content = Dummy() + directlyProvides(content, IDocument) + provideAdapter(DummyWorkflowChainAdapter, + adapts=(IDocument, IWorkflowTool)) + self.assertEqual(self.workflow.getChainFor(content), + ('Static Workflow',)) + # undo our registration so we don't break tests + components = getGlobalSiteManager() + components.unregisterAdapter(DummyWorkflowChainAdapter, + required=(IDocument, IWorkflowTool)) def test_suite(): from unittest import TestSuite, makeSuite Modified: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/test_doctests.py ============================================================================== --- Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/test_doctests.py (original) +++ Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/tests/test_doctests.py Sun Jan 13 20:51:17 2008 @@ -20,6 +20,7 @@ test_class=PloneTestCase.FunctionalTestCase), DocTestSuite('Products.CMFPlone.TranslationServiceTool'), DocTestSuite('Products.CMFPlone.utils'), + DocTestSuite('Products.CMFPlone.workflow'), ) return TestSuite(suites) Added: Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/workflow.py ============================================================================== --- (empty file) +++ Products.CMFPlone/branches/adapterized-workflow/Products/CMFPlone/workflow.py Sun Jan 13 20:51:17 2008 @@ -0,0 +1,62 @@ +from zope.interface import Interface, implementer +from zope.component import adapter +from Acquisition import aq_base, aq_inner +from Globals import PersistentMapping +from Products.CMFCore.interfaces import IWorkflowTool +from Products.CMFPlone.interfaces import IWorkflowChain + +@adapter(Interface, IWorkflowTool) +@implementer(IWorkflowChain) +def ToolWorkflowChain( context, workflow_tool ): + """Looks up the workflow chain by portal type suing a mapping + stored on the tool:: + + >>> from Products.CMFPlone.tests.dummy import DummyContent, DummyWorkflowTool + >>> tool = DummyWorkflowTool() + >>> content = DummyContent(id='dummy', portal_type='DummyType') + + Either an object with a portal_type or the portal_type as a + string. If we pass in an unknown portal_type we get the default + chain:: + + >>> ToolWorkflowChain('A Type', tool) + ('Default Workflow',) + >>> tool.setChainForPortalTypes(('A Type',), ('Some Workflow',)) + >>> ToolWorkflowChain('A Type', tool) + ('Some Workflow',) + + When we pass in a piece of content we get similar behavior:: + + >>> ToolWorkflowChain(content, tool) + ('Default Workflow',) + >>> tool.setChainForPortalTypes(('DummyType',), ('Some Workflow',)) + >>> ToolWorkflowChain(content, tool) + ('Some Workflow',) + + If we can't figure out a portal_type then we return an empty chain:: + + >>> ToolWorkflowChain((), tool) + () + + """ + if isinstance(context, basestring): + pt = context + elif hasattr(aq_base(context), 'getPortalTypeName'): + pt = context.getPortalTypeName() + else: + pt = None + if pt is None: + return () + chain = None + + # Unfortunately we need to rely on a private variable here + cbt = workflow_tool._chains_by_type + + if cbt is not None: + chain = cbt.get(pt, None) + if chain is None: + chain = workflow_tool.getDefaultChainFor(context) + if chain is None: + return () + return chain + |