From: kiorky <svn...@pl...> - 2011-01-30 21:05:15
|
Author: kiorky Date: Sun Jan 30 21:03:07 2011 New Revision: 47124 Modified: plone.app.async/branches/defer/src/plone/app/async/README.txt plone.app.async/branches/defer/src/plone/app/async/interfaces.py plone.app.async/branches/defer/src/plone/app/async/service.py plone.app.async/branches/defer/src/plone/app/async/tests/test_simplejob.py Log: Now, we can defer jobs for later processing. Modified: plone.app.async/branches/defer/src/plone/app/async/README.txt ============================================================================== --- plone.app.async/branches/defer/src/plone/app/async/README.txt (original) +++ plone.app.async/branches/defer/src/plone/app/async/README.txt Sun Jan 30 21:03:07 2011 @@ -79,7 +79,7 @@ Because by default the jobs are executed with the default quota set to 1, (i.e. only one job can be executed at a time), jobs are executed serially and according to the order by which they were submitted. Hence, waiting for the -job that submits the document implies that the one that created it has already +job that submits the document implies that the one that created it has already been carried out. >>> wait_for_result(job2) @@ -108,6 +108,19 @@ If you want to execute jobs in parallel, you can use ``queueParallelJobs``. +Deferring Jobs +----------------------- +You can defer Jobs by using one of those methods which reflect the AsyncService standard methods. But rather than executing the job on the fly, they defer the job for later execution. + + * queueDeferredJob + * queueDeferredSerialJobs + * queueDeferredParallelJobs + * queueDeferredJobInQueue + * queueDeferredSerialJobsInQueue + * queueDeferredParallelJobsInQueue + +You can refer to the ``plone.app.async.interfaces.IAsyncService`` interface for the exact signature of those, but basicly they take a ``begin_after`` (**datetime**) argument to specify the launch time. + Security and user permissions ----------------------------- @@ -139,7 +152,7 @@ >>> results ['Success: 42'] -Failures can be handled in the same way. +Failures can be handled in the same way. >>> results = [] >>> def failingJob(context): Modified: plone.app.async/branches/defer/src/plone/app/async/interfaces.py ============================================================================== --- plone.app.async/branches/defer/src/plone/app/async/interfaces.py (original) +++ plone.app.async/branches/defer/src/plone/app/async/interfaces.py Sun Jan 30 21:03:07 2011 @@ -34,17 +34,63 @@ """ def queueJobInQueue(queue, quota_names, func, context, *args, **kwargs): - """Queue a job in a specific queue.""" + """Queue a job in a specific queue. + Looks into the kwargs for: + plone.app.async.service.DEFERRED_JOB_KEY + If it is present, use the value (datetime) to set + the 'begin_after' queue.put argument to defer the job start + """ + + def queueSerialJobsInQueue(queue, quota_names, *job_infos, **kwargs): + """Queue several jobs in a specific queue, to be run serially + + job_info is a tuple with (func, context, args, kwargs). + """ + + def queueParallelJobsInQueue(queue, quota_names, *job_infos, **kwargs): + """Queue several jobs in a specific queue, to be run in parallel + + job_info is a tuple with (func, context, args, kwargs). + """ + + def queueDeferredJob(func, begin_after, context, *args, **kwargs): + """Queue a job. + begin_after : datetime after which the job can be launched + """ + + def queueDeferredJobInQueue(queue, quota_names, func, begin_after, context, *args, **kwargs): + """Queue a job in a specific queue. + Looks into the kwargs for: + plone.app.async.service.DEFERRED_JOB_KEY + If it is present, use the value (datetime) to set + the 'begin_after' queue.put argument to defer the job start + """ - def queueSerialJobsInQueue(queue, quota_names, *job_infos): + def queueDeferredSerialJobsInQueue(queue, quota_names, begin_after, *job_infos): """Queue several jobs in a specific queue, to be run serially + begin_after : datetime after which the job can be launched job_info is a tuple with (func, context, args, kwargs). """ - def queueParallelJobsInQueue(queue, quota_names, *job_infos): + def queueDeferredParallelJobsInQueue(queue, quota_names, begin_after, *job_infos): """Queue several jobs in a specific queue, to be run in parallel + begin_after : datetime after which the job can be launched + job_info is a tuple with (func, context, args, kwargs). + """ + + def queueDeferredSerialJobs(begin_after, *job_infos): + """Queue several jobs, to be run serially + + begin_after : datetime after which the job can be launched + job_info is a tuple with (func, context, args, kwargs). + """ + + def queueDeferredParallelJobs(begin_after, *job_infos): + """Queue several jobs, to be run in parallel + + begin_after : datetime after which the job can be launched job_info is a tuple with (func, context, args, kwargs). """ Modified: plone.app.async/branches/defer/src/plone/app/async/service.py ============================================================================== --- plone.app.async/branches/defer/src/plone/app/async/service.py (original) +++ plone.app.async/branches/defer/src/plone/app/async/service.py Sun Jan 30 21:03:07 2011 @@ -80,6 +80,13 @@ notify(JobFailure(result)) +DEFERRED_JOB_KEY = 'asyncjob_begin_after' +def get_begin_after(kwargs): + begin_after = kwargs.get(DEFERRED_JOB_KEY, None) + if DEFERRED_JOB_KEY in kwargs: + del kwargs[DEFERRED_JOB_KEY] + return begin_after + class AsyncService(threading.local): """Utility providing async execution services to Plone. """ @@ -100,16 +107,22 @@ return self._conn.root()[KEY] def queueJobInQueue(self, queue, quota_names, func, context, *args, **kwargs): - """Queue a job in the specified queue.""" + """Queue a job in the specified queue. + Looks into the kwargs for: + plone.app.async.service.DEFERRED_JOB_KEY + If it is present, use the value (datetime) to set + the 'begin_after' queue.put argument to defer the job start + """ portal = getUtility(ISiteRoot) portal_path = portal.getPhysicalPath() context_path = context.getPhysicalPath() uf_path, user_id = _getAuthenticatedUser() + begin_after = get_begin_after(kwargs) job = Job(_executeAsUser, context_path, portal_path, uf_path, user_id, func, *args, **kwargs) if quota_names: job.quota_names = quota_names - job = queue.put(job) + job = queue.put(job, begin_after = begin_after) job.addCallbacks(success=job_success_callback, failure=job_failure_callback) return job @@ -119,11 +132,28 @@ queue = self.getQueues()[''] return self.queueJobInQueue(queue, ('default',), func, context, *args, **kwargs) - def _queueJobsInQueue(self, queue, quota_names, job_infos, serialize=True): - """Queue multiple jobs in the specified queue.""" + def queueDeferredJob(self, func, begin_after, context, *args, **kwargs): + """Queue a deferred job in the default queue.""" + queue = self.getQueues()[''] + kwargs[DEFERRED_JOB_KEY] = begin_after + return self.queueJobInQueue(queue, ('default',), func, context, *args, **kwargs) + + def queueDeferredJobInQueue(self, queue, quota_names, func, begin_after, context, *args, **kwargs): + """Queue a deferred job in the specified queue.""" + kwargs[DEFERRED_JOB_KEY] = begin_after + return self.queueJobInQueue(queue, ('default',), func, context, *args, **kwargs) + + def _queueJobsInQueue(self, queue, quota_names, job_infos, serialize=True, **kwargs): + """Queue multiple jobs in the specified queue. + Looks into the kwargs for: + plone.app.async.service.DEFERRED_JOB_KEY + If it is present, use the value (datetime) to set + the 'begin_after' queue.put argument to defer the job start + """ portal = getUtility(ISiteRoot) portal_path = portal.getPhysicalPath() uf_path, user_id = _getAuthenticatedUser() + begin_after = get_begin_after(kwargs) scheduled = [] for (func, context, args, kwargs) in job_infos: context_path = context.getPhysicalPath() @@ -136,18 +166,18 @@ job = parallel(*scheduled) if quota_names: job.quota_names = quota_names - job = queue.put(job) + job = queue.put(job, begin_after = begin_after) job.addCallbacks(success=job_success_callback, failure=job_failure_callback) return job - def queueSerialJobsInQueue(self, queue, quota_names, *job_infos): + def queueSerialJobsInQueue(self, queue, quota_names, *job_infos, **kwargs): """Queue serial jobs in the specified queue.""" - return self._queueJobsInQueue(queue, quota_names, job_infos, serialize=True) + return self._queueJobsInQueue(queue, quota_names, job_infos, serialize=True, **kwargs) - def queueParallelJobsInQueue(self, queue, quota_names, *job_infos): + def queueParallelJobsInQueue(self, queue, quota_names, *job_infos, **kwargs): """Queue parallel jobs in the specified queue.""" - return self._queueJobsInQueue(queue, quota_names, job_infos, serialize=False) + return self._queueJobsInQueue(queue, quota_names, job_infos, serialize=False, **kwargs) def queueSerialJobs(self, *job_infos): """Queue serial jobs in the default queue.""" @@ -158,3 +188,27 @@ """Queue parallel jobs in the default queue.""" queue = self.getQueues()[''] return self.queueParallelJobsInQueue(queue, ('default',), *job_infos) + + def queueDeferredSerialJobsInQueue(self, queue, quota_names, begin_after, *job_infos): + """Queue serial jobs in the specified queue.""" + kwargs = {DEFERRED_JOB_KEY : begin_after} + return self._queueJobsInQueue(queue, quota_names, job_infos, serialize=True, **kwargs) + + def queueDeferredParallelJobsInQueue(self, queue, quota_names, begin_after, *job_infos): + """Queue parallel jobs in the specified queue.""" + kwargs = {DEFERRED_JOB_KEY : begin_after} + return self._queueJobsInQueue(queue, quota_names, job_infos, serialize=False, **kwargs) + + def queueDeferredSerialJobs(self, begin_after, *job_infos): + """Queue serial jobs in the default queue.""" + queue = self.getQueues()[''] + kwargs = {DEFERRED_JOB_KEY : begin_after} + return self.queueSerialJobsInQueue(queue, ('default',), *job_infos, **kwargs) + + def queueDeferredParallelJobs(self, begin_after, *job_infos): + """Queue parallel jobs in the default queue.""" + queue = self.getQueues()[''] + kwargs = {DEFERRED_JOB_KEY : begin_after} + return self.queueParallelJobsInQueue(queue, ('default',), *job_infos, **kwargs) + + Modified: plone.app.async/branches/defer/src/plone/app/async/tests/test_simplejob.py ============================================================================== --- plone.app.async/branches/defer/src/plone/app/async/tests/test_simplejob.py (original) +++ plone.app.async/branches/defer/src/plone/app/async/tests/test_simplejob.py Sun Jan 30 21:03:07 2011 @@ -1,11 +1,14 @@ +import datetime +import pytz import transaction from zope.component import getUtility -from zc.async.testing import wait_for_result +from zc.async.testing import wait_for_result, set_now, setUpDatetime, tearDownDatetime from Products.PloneTestCase.PloneTestCase import default_user from Products.CMFCore.utils import getToolByName from plone.app.async.tests.base import AsyncTestCase from plone.app.async.interfaces import IAsyncService from plone.app.async.service import makeJob +from plone.app.async.service import get_begin_after def addNumbers(context, x1, x2): @@ -183,7 +186,17 @@ # not accessible by anon self.assertEqual(wait_for_result(job), 0) - + def test_getafter(self): + """test_getafter.""" + data = {} + ba = get_begin_after(data) + self.assertEquals(ba, None) + self.assertEquals(data, {}) + data = {'asyncjob_begin_after': 'foo'} + ba = get_begin_after(data) + self.assertEquals(ba, 'foo') + self.assertEquals(data, {}) + def test_suite(): from unittest import TestSuite, makeSuite suite = TestSuite() |