#131 Improve the flow of results for local execution

Next_Release
closed-fixed
Client (32)
5
2011-07-13
2011-06-09
No

From this forum thread: http://www.jppf.org/forums/index.php/topic,1587.0.html

Currently, we can observe a different behaviour for resultsReceived events for local and remote execution.

The results received can be used to monitor job progress.

Currently:
- LOCAL: you fire resultsReceived after all task are finished
- REMOTE: resultsReceived is fired after node finishes task bundle

Suggested enhacement for local execution:

LocalExecutionThread::run()
//----- I SUGGEST FIRE 'resultsReceived' HERE
for (Future<?> f: futures) f.get();
int n = futures.size();
if (job.getResultListener() != null)
{
synchronized(job.getResultListener())
{
job.getResultListener().resultsReceived(new TaskResultEvent(tasks));
}
}

Simple fix:
for (Future<?> f: futures)
{
f.get();
if (job.getResultListener() != null)
{
synchronized(job.getResultListener())
{
job.getResultListener().resultsReceived(new TaskResultEvent(
Collections.singletonList(<task from Future f>)));
}
}
}

Better fix - accumulated results: - similar to simple fix - but ask next future from futures if 'isDone'.
TRUE => add to collection, FALSE => fire resultsReceived for accummulated collection of done futures.

Discussion

  • Martin JANDA

    Martin JANDA - 2011-06-11

    I have proposed patch for 'extended' version that acummulates task results for 50ms and then notify resulReceived

     
  • Martin JANDA

    Martin JANDA - 2011-06-11

    *** /home/jandam/java/ext/jppf/2.5/JPPF-2.5-full-src/client/src/java/org/jppf/client/loadbalancer/LoadBalancer.java 2011-06-02 10:58:52.000000000 +0200
    --- /home/jandam/java/ext/jppf/2.5/JPPF-2.5-dev/client/src/java/org/jppf/client/loadbalancer/LoadBalancer.java 2011-06-11 19:26:37.000000000 +0200
    ***************
    *** 55,60 ****
    --- 55,66 ----
    * Index for remote bundler.
    */
    private static final int REMOTE = 1;
    +
    + /**
    + * Timeout for accumulating task results
    + */
    + private static final long TIMEOUT_LIMIT_NS = 50L * 1000000L;
    +
    /**
    * Determines whether local execution is enabled on this client.
    */
    ***************
    *** 274,295 ****
    try
    {
    long start = System.currentTimeMillis();
    ! List<Future<?>> futures = new ArrayList<Future<?>>();
    for (JPPFTask task: tasks)
    {
    task.setDataProvider(job.getDataProvider());
    ! futures.add(threadPool.submit(new TaskWrapper(task)));
    ! }
    ! for (Future<?> f: futures) f.get();
    ! int n = futures.size();
    ! if (debugEnabled) log.debug("received " + n + " tasks from local executor" + (n > 0 ? ", first position=" + tasks.get(0).getPosition() : ""));
    ! if (job.getResultListener() != null)
    ! {
    ! synchronized(job.getResultListener())
    ! {
    ! job.getResultListener().resultsReceived(new TaskResultEvent(tasks));
    ! }
    }
    double elapsed = System.currentTimeMillis() - start;
    bundlers[LOCAL].feedback(tasks.size(), elapsed);
    }
    --- 280,318 ----
    try
    {
    long start = System.currentTimeMillis();
    ! int count = 0;
    ! ExecutorCompletionService<JPPFTask> ecs = new ExecutorCompletionService<JPPFTask>(threadPool);
    ! List<Future<JPPFTask>> futures = new ArrayList<Future<JPPFTask>>(tasks.size());
    for (JPPFTask task: tasks)
    {
    task.setDataProvider(job.getDataProvider());
    ! futures.add(ecs.submit(new TaskWrapper(task), task));
    }
    +
    + while(count < futures.size()) {
    + List<JPPFTask> taskList = new ArrayList<JPPFTask>();
    + taskList.add(ecs.take().get());
    + count++;
    +
    + Future<JPPFTask> future;
    + long finish = System.nanoTime() + TIMEOUT_LIMIT_NS;
    + long timeout = finish - System.nanoTime();
    + while(count < futures.size() && timeout > 0L && (future = ecs.poll(timeout, TimeUnit.NANOSECONDS)) != null) {
    + taskList.add(future.get());
    + timeout = finish - System.nanoTime();
    + count++;
    + }
    +
    + int n = taskList.size();
    + if (debugEnabled) log.debug("received " + n + " tasks from local executor" + (n > 0 ? ", first position=" + taskList.get(0).getPosition() : ""));
    + if (job.getResultListener() != null)
    + {
    + synchronized(job.getResultListener())
    + {
    + job.getResultListener().resultsReceived(new TaskResultEvent(taskList));
    + }
    + }
    + }
    double elapsed = System.currentTimeMillis() - start;
    bundlers[LOCAL].feedback(tasks.size(), elapsed);
    }

     
  • Laurent Cohen

    Laurent Cohen - 2011-07-02
    • assigned_to: nobody --> lolocohen
     
  • Laurent Cohen

    Laurent Cohen - 2011-07-13

    This feature is now implemented. We have improved on the original request by also enabling to send a notification when the size of accumulated results reaches a specified threshold.This enables using a size threashold or a time threshold, or both.

    The behavior is configurable via 3 properties:

    1) jppf.local.execution.accumulation.time
    Maximum time to wait before notifying of available local execution results. The default value is Long.MAX_VALUE

    2) jppf.local.execution.accumulation.time.unit
    Unit in which the accumulation time is expressed. Possible values: n = nanos | m = millis | s = seconds | M = minutes | h = hours | d = days. The default value is n (nanos)

    3) jppf.local.execution.accumulation.size
    Maximum number of available local execution results before sending a notification. The default value is Integer.MAX_VALUE.

    Changes committed to SVN:
    trunk: revision 1730
    branch b2.5: revision 1731

     
  • Laurent Cohen

    Laurent Cohen - 2011-07-13
    • status: open --> closed-fixed
     

Get latest updates about Open Source Projects, Conferences and News.

Sign up for the SourceForge newsletter:





No, thanks