Saturday, June 17, 2017

Fork And Join

What is Fork & Join ?
Logically speaking its a simple divide and conquer technique.
if you have a big task , you can break it down to smaller tasks and then join results of each of the smaller tasks

Examples
  1. Quick sort, details here.
  2. Binary search
  3. Looks for a word in a document. You will implement the following two kinds of tasks:
    • A document task, which is going to search a word in a set of lines of a document
    • A line task, which is going to search a word in a part of the document

API Details
There is interface java.util.concurrent.ExecutorService that is has two main implementation
  1. AbstractExecutorService : parallel execute threads
  2. ForkJoinPool : Divide and Conquer
 Different methods
  1. execute(ForkJoinTask task) : asynchronous execution of different tasks`
  2. execute(Runnable task) : asynchronous but Note that the ForkJoinPool class doesn't use the work-stealing algorithm with Runnable objects
  3. invoke(ForkJoinTask task) : synchronous execution (This call doesn't return until the task passed as a parameter finishes its execution.)
  4. invokeAll(ForkJoinTask... tasks) : variable list of arguments. You can pass to it as parameters
  5. invokeAll(Collection tasks) : pass a collection
You should be aware of a big difference between the synchronous and asynchronous methods.

When you use the synchronized methods, the task that calls one of these methods (for example, the invokeAll() method) is suspended until the tasks it sent to the pool finish their execution. This allows the ForkJoinPool class to use the work-stealing algorithm to assign a new task to the worker thread that executed the sleeping task.

On the contrary, when you use the asynchronous methods (for example, the fork() method), the task continues with its execution, so the ForkJoinPool class can't use the work-stealing algorithm to increase the performance of the application. In this case, only when you call the join() or get() methods to wait for the finalization of a task, the ForkJoinPool class can use that algorithm.

To implement you need to do 2 things
1) Make a class that extends RecursiveTask or RecursiveAction and has following logic

If (problem size > size){
 tasks=Divide(task);
 execute(tasks);
 groupResults()
 return result;
} else {
 resolve problem;
 return result;
}
The difference is
  1. RecursiveTask : returns  some results
  2. RecursiveAction : doesn't return result


2) In main class call the above task


// Create a DivideAndConquerTask
DivideAndConquerTask task=new DivideAndConquerTask();
// Create a ForkJoinPool
ForkJoinPool pool=new ForkJoinPool();
// Execute the Task
pool.execute(task);
// Write statistics about the pool
do {
 System.out.printf("Main: Parallelism: %d\n",pool.getParallelism());
 System.out.printf("Main: Active Threads: %d\n",pool.getActiveThreadCount());
 System.out.printf("Main: Task Count: %d\n",pool.getQueuedTaskCount());
 System.out.printf("Main: Steal Count: %d\n",pool.getStealCount());
 try {
     TimeUnit.SECONDS.sleep(1);
 } catch (InterruptedException e) {
     e.printStackTrace();
 }
} while (!task.isDone());

Exceptions
The compute method doesnt throw checked exception , it can only throw RunTimeException
Other option is call completeExceptionally(Throwable ex)
Completes this task abnormally, and if not already aborted or cancelled, causes it to throw the given exception upon join and related operations

Cancelling Tasks
The ForkJoinTask class provides the cancel( ) method that allows you to cancel a task if it hasn't been executed yet. This is a very important point. If the task has begun its execution, a call to the cancel() method has no effect. The method receives a parameter as a Boolean value called mayInterruptIfRunning.  The Java API documentation specifies that, in the default implementation of the ForkJoinTask class, this attribute has no effect.

A limitation of the Fork/Join framework is that it doesn't allow the cancelation of all the tasks that are in ForkJoinPool. To overcome that limitation, you have implemented the TaskManager class. It stores all the tasks that have been sent to the pool. It has a method that cancels all the tasks it has stored. If a task can't be canceled because it's running or it has finished, the cancel() method returns the false value, so you can try to cancel all the tasks without being afraid of possible collateral effects.

Todo
https://www.igvita.com/2012/02/29/work-stealing-and-recursive-partitioning-with-fork-join/

No comments: