package pfor.sample;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import pfor.CompletionCallback;
import pfor.Merger;
import pfor.PArray;
import pfor.PTask;
import pfor.ParallelContext;
import pfor.ProgressCallback;
import pfor.annotations.Merge;
import pfor.annotations.Speed;
import pfor.annotations.Merge.MergeType;
import pfor.annotations.Speed.SpeedType;
import pfor.closures.index.tableindex.TableIndexTask;
public class PForTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
final ParallelContext context = ParallelContext.getDefaultInstance();
final PArray pArray = context.getPArray();
/* Input */
final String[] tab = { "One", "two", "three", "four" };
/* Action - computing length of a string in an array */
final TableIndexTask<String, Integer> action = new TableIndexTask<String, Integer>(tab) {
/* With 'VERY_SLOW' setting, each subtask will be executed in its own batch (in this example there will be 4 subtasks - one for each string)
* With 'MODERATE' there would be batches each containing up to 1000 subsequent subtasks.
* With a lower speed, granuality of subtasks is better, but the overhead of thread congestion and memory usage is also higher
*/
@Speed(speed = SpeedType.VERY_SLOW)
public Integer perform(int index) throws Exception {
return table[index].length();
}
};
/* Merger of results - order of merging is irrelevant. Random order merger is the fastest, but not always applicable. */
final Merger<Integer> merger = new Merger<Integer>() {
@Merge(type = MergeType.RANDOM)
public Integer merge(Integer i1, Integer i2) {
return i1 + i2;
}
};
/* Creation of task */
PTask<Integer> ptask = pArray.each(action, merger);
/* Register callback (delivery of a final result on a completion) */
ptask = ptask.withCompletionCallback(new CompletionCallback<Integer>() {
public void onComplete(Future<Integer> future) {
try {
System.out.println("Completion callback received (the result):" + future.get());
} catch (final Exception e) {
e.printStackTrace();
}
}
});
/* Register progress observer callback (can be used to inform a user how many subtasks have already been completed)
* Called asynchronously many times during a execution after each batch of tasks (size of a batch is controlled with @Speed annotation */
ptask = ptask.withProgressCallback(new ProgressCallback() {
@Override
public void onProgress(int i) {
System.out.println("Progress callback received (number of completed subtasks):" + i);
}
});
/* Execute the task asynchronously */
final Future<Integer> future = ptask.executeAsynchronously(false);
/* Give some time for a taks to complete */
Thread.sleep(100);
/* future.get() will block until the tasks is completed.
* The result will be the same, as in a completion callback */
System.out.println("The sum of lengths of all strings: " + future.get());
}
}