Thursday, November 9, 2017

Completable future (java 8) simple example

A long long time ago class Future was introduced in Java. Now, with Java 8 we have an "upgraded" version of it.

1. Regular "Future"

Futures are very useful objects: if some task which will be executed in another thread have to return some value - we can use Callable object which returns Futubre object. Later we will be able to get the value returned by task by executing .get method from returned Future. The problem here: when we are executing .get - we are blocked: if the value is not ready yet(long running task) we will be waiting till the value will finally be returned.

Let's write some code to recall our knowledge of Future:

Function for simulation of long running task:

public int longTask(int delay) {
    try {
        Thread.sleep(delay);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return delay;

}

Function for creation Callable from this long task:
public Callable<Integer> longCallable(int delay) {
    Callable<Integer> result = () -> longTask(delay);
    return result;
}


Function for submitting Callable by Executor and creation of Future:

public Future<Integer> longFuture(int delay) {
    return Executors.newWorkStealingPool().submit(longCallable(delay));
}

Debug function to print our Futures:

public void printFuture(Future<Integer> future) {
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

Let's create an array with several futures:

public List<Future<Integer>> getTestFutures() {
    return IntStream.rangeClosed(1, 3)
            .<Future<Integer>>mapToObj(i -> longFuture(i))
            .collect(Collectors.toList());
}

Now we have a Futures, so let's take the values from them:  

public void blockingFutureGet() {
    getTestFutures().forEach(f -> printFuture(f));
}


And, as we mentioned before - we are blocked here. 
After next execution:  

System.out.println("Begin");
app.blockingFutureGet();
System.out.println("End");


Result will be: 
Begin
1
2
3
End

So, the main thread was blocked till we finally got ALL VALUES from the Futures. 
Sometimes such behavior is not acceptable: we may need to execute tasks in totally async way. 
For that purpose we may rewrite previous function this way: 


public void nonBlockingFutureGet() {
    new Thread(this::blockingFutureGet).start();
}

We are running additional thread to get values from the Futures.
The result will be:
Begin
End
1
2
3

-  the main thread was not blocked - it continued execution while another thread was getting values from Futures. But this approach little bit complicated: we have to submit our tasks, collect the futures, run additional thread, pass collected futures there and execute all needed manipulations. To make it simpler we can just use CompletableFutures.

2. CompletableFuture

And now let's compare all the code above with CompletableFuture implementation:

public void completableFuture() {
    IntStream.rangeClosed(1, 3).forEach(i ->
        CompletableFuture.supplyAsync(() -> longTask(i))
           .thenAccept(System.out::println)
    );
}

It's now much easier!  Also results can be forwarded further for additional tasks/operations:

public void completableFuture2() {
    IntStream.rangeClosed(1, 3).forEach(i ->
            CompletableFuture.supplyAsync(() -> longTask(i))
                    .thenApply(v->v*10)
                    .thenApply(v->Integer.toString(v)+"!")
                    .thenAccept(System.out::println)
                    .exceptionally(e-> {
                        System.out.println("We have a problem:"+e.getMessage());
                        return null;
                    })
    );
}

3. Running additional asynchronous tasks using thenCompose

In previous example we used thenApply method which executes tasks in synchronous way. But it can be asynchronous - we can execute thenCompose method and return another async task:

public void cfThenCompose() {
    IntStream.rangeClosed(1, 3)
            .mapToObj(i -> CompletableFuture.supplyAsync(() -> longTask(i * 1000)))
            .forEach(f -> f.thenCompose(j -> {
                             System.out.println(j);
                             return CompletableFuture.supplyAsync(() -> longTask(j + 1));
                         }).thenAccept(System.out::println)
                     );
}


Results of execution:
1000
2000
1001
3000
2001
3001

4. Combining results of 2 independent futures.

In a previous example we were running execution based on previous execution.But what if need result of 2 tasks for further computation? We can use thenCombine method or that:

public void cfThenCombine() {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->longTask(1000));
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(()->longTask(2000));
    f1.thenCombine(f2, (r1, r2)-> r1+r2).thenAccept(System.out::println);
}

The result is, as expected:
3000

5. The end.

CompletableFutures as the big step forward for java in the direction of functional programming.