Sample usage

Krzysztof Dębski

package pfor.sample;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import pfor.CompletionCallback;
import pfor.Merger;
import pfor.PArray;
import pfor.PTask;
import pfor.ParallelContext;
import pfor.ProgressCallback;
import pfor.annotations.Merge;
import pfor.annotations.Speed;
import pfor.annotations.Merge.MergeType;
import pfor.annotations.Speed.SpeedType;
import pfor.closures.index.tableindex.TableIndexTask;

public class PForTest {

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final ParallelContext context = ParallelContext.getDefaultInstance();
    final PArray pArray = context.getPArray();

    /* Input */
    final String[] tab = { "One", "two", "three", "four" };

    /* Action - computing length of a string in an array */
    final TableIndexTask<String, Integer> action = new TableIndexTask<String, Integer>(tab) {
        /* With 'VERY_SLOW' setting, each subtask will be executed in its own batch (in this example there will be 4 subtasks - one for each string)
         * With 'MODERATE' there would be batches each containing up to 1000 subsequent subtasks.
         * With a lower speed, granuality of subtasks is better, but the overhead of thread congestion and memory usage is also higher
         */
        @Speed(speed = SpeedType.VERY_SLOW)
        public Integer perform(int index) throws Exception {
            return table[index].length();
        }
    };

    /* Merger of results - order of merging is irrelevant. Random order merger is the fastest, but not always applicable.  */
    final Merger<Integer> merger = new Merger<Integer>() {
        @Merge(type = MergeType.RANDOM)
        public Integer merge(Integer i1, Integer i2) {
            return i1 + i2;
        }
    };

    /* Creation of task */
    PTask<Integer> ptask = pArray.each(action, merger);

    /* Register callback (delivery of a final result on a completion) */
    ptask = ptask.withCompletionCallback(new CompletionCallback<Integer>() {
        public void onComplete(Future<Integer> future) {
            try {
                System.out.println("Completion callback received (the result):" + future.get());
            } catch (final Exception e) {
                e.printStackTrace();
            }
        }
    });

    /* Register progress observer callback (can be used to inform a user how many subtasks have already been completed)
     * Called asynchronously many times during a execution after each batch of tasks (size of a batch is controlled with @Speed annotation */
    ptask = ptask.withProgressCallback(new ProgressCallback() {
        @Override
        public void onProgress(int i) {
            System.out.println("Progress callback received (number of completed subtasks):" + i);
        }
    });

    /* Execute the task asynchronously */
    final Future<Integer> future = ptask.executeAsynchronously(false);

    /* Give some time for a taks to complete */
    Thread.sleep(100);

    /* future.get() will block until the tasks is completed.
     * The result will be the same, as in a completion callback */
    System.out.println("The sum of lengths of all strings: " + future.get());
}

}


Related

Wiki: Home