CompletableFutures & Asynchronous Programming
CompletableFuture
CompletableFutures in Java is the same as Promise in Javascript
Javascript code
Javascript is dynamically typed
Current State | Next State | Function called |
---|---|---|
resolved | resolved | next then in the pipeline |
resolved | rejected | next catch in the pipeline |
rejected | resolved | next then in the pipeline |
rejected | resolved | next catch in the pipeline |
Java
Java is statically typed, so we have to provide **the type of CompletableFuture ** in the declaration
CompletableFuture - ThreadPool
By Default, Completable future uses the Common ForkJoinPool.
- Which means that the number of threads in a common fork-join pool is equal to the number of cores in the machine
Runtime.getRuntime().availableProcessors()
Common ForkJoinPool is shared by
- ParallelStreams
- CompletableFuture
And thus, user defined thread pool is also an option to avoid for resource waiting scenarios arising from common thread pool.
CompletableFuture started on a different Pool (mypool) altogether.
- It’s a good idea to use a different Thread pool if the tasks are IO bound.
- The Fork Join Pool must be used only CPU intensive task
User-defined thread pool
int numberOfCores = Runtime.getRuntime().availableProcessors();
ExecutorService pool = Executors.newFixedThreadPool(numberOfCores);
//Usually the Thread pool would be created upfront
ExecutorService pool = Executors.newCachedThreadPool();
ForkJoinPool pool = new ForkJoinPool(10);
finally {
pool.close();
}
Pass the pool into async methods (thenApplyAsync, thenAcceptAsync)
ExecutorService pool = Executors.newCachedThreadPool();
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> compute(), pool);
CompletableFuture<Double> doubleCompletableFuture = future.thenApplyAsync(data -> data * 2, pool);
CompletableFuture<Void> voidCompletableFuture = doubleCompletableFuture.thenAcceptAsync(data -> getPrintln(data),pool);
Creating a new CompletableFuture
supplyAsync() - with return Data
CompletableFuture<V>
runAsync() - No Return data
CompletableFuture<Void>
new CompletableFuture<>()
Creating a pipeline and then completing
complete(T value):
- allows completing a
CompletableFuture
, manually, with a specific result value. - to provide a result explicitly, bypassing the actual asynchronous computation.
Stages of Completable futures
When one stage completes, another one starts and it keeps running
supplyAsync()
- Factory method
- used to initiate asynchronous computations (tasks)
- takes Supplier as the input
- returns
CompletableFuture<T>()
of type T
thenApply() & thenApplyAsync()
- Completion Stage method
- used for applying transformations, takes a Function
- thenApply deals with Function that returns a value
- returns
CompletableFuture<T>()
of TypeT
thenAccept() & thenAcceptAsync
- CompletionStage method
- used for chaining asynchronous tasks. Has the capability to use the results of previous async task and perform actions on it
- takes Consumer as the input
- returns
CompletableFuture<Void>()
type Void
Exceptionally
With the use of exceptionally if the execution of the task is
- OK, go to the next THEN
- exception, go to the next EXCEPTIONALLY,
- BUT with proper type handling. The return type of Exceptionally has to be of the proper type.
- Write the exception code generically and use that so that it aligns to the data type properly.
thenCombine()
Use Case : When there is a need to bring data from multiple microservices and combine them
- used to combine Independent Completable Futures (two asynchronous tasks)
- For Example : if a service makes 2 calls to independent services, then the total latency will be MAX(service1, service2) instead of SUM(service1, service2)
- Takes two arguments -
thenCombine(CompletionStage, BiFunction)
- CompletionStage,
- BiFunction
- Returns a CompletableFuture
thenCompose()
- compose() is used for transforming the result of one CompletableFuture into another CompletableFuture.
- used to chain two asynchronous operations where the second depends on the result of the first.
- The function provided to compose() maps the result of the first CompletableFuture to a new CompletableFuture.
- thenCompose depends on the completion of the dependent Future task
- Completion Stage method
- Input is a
Function
functional interface, Transform data from one form to another - returns **CompletableFuture
** here T is the type of the result of the second CompletableFuture. - The resulting CompletableFuture is a flattened chain.
compose() –> sequencing dependent asynchronous tasks, thenCombine() –> combine the results of two independent asynchronous tasks into a single result
join() - Blocking Until Completion
Ensuring All Steps Complete: The CompletableFuture operations you chain (e.g., thenApply, exceptionally, thenAccept, thenRun) will execute asynchronously.
If the main thread exits before these operations complete, you won’t see their
output.
join()
ensures that the main thread waits for the entire chain of operations
to finish.
- to obtain the result of the asynchronous computation when it’s done.
- similar to the
get()
method, but doesn’t throw checked exceptions. - waits indefinitely for the computation to finish
- returns the result
- or throws any unhandled exception if one occurs.
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
int result = future.join(); // Get the result when the computation is complete
//Blocks the main thread until the supplyAsync is done
get() & getNow() INSTEAD use thenAccept()
get() is a blocking call; The best thing to do with GET is to forGET. INSTEAD use thenAccept()
- like join(), the get() method is also used to obtain the result of the asynchronous computation when it’s done.
- Unlike join(), the get() method can throw checked exceptions, specifically
InterruptedException
andExecutionException
, which need to be handled. - use get() if there is a need to explicit handling for interruptions and want to differentiate between exceptions and interruptions.
-
if it’s so important to use get, use getNow() with a default value
- getNow() is impatient non-blocking and moves on with a value if there is no immediate response
- If there is delay prior to getNow call then the getNow may return the correct value.
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
try {
int result = future.get(); // Get the result and handle exceptions
int data = future.getNow(-99.0);//need to provide a value if the value is absent
future.thenAccept(data -> getPrintln(data));
} catch (InterruptedException e) {
System.out.println("Thread was interrupted");
Thread.currentThread().interrupt(); // Preserve interruption status
} catch (ExecutionException e) {
System.out.println("Caught exception: " + e.getCause()); // Print actual cause
}
TimeOut - 2 functions
completeOnTimeout
if the CompletableFuture doesn’t complete within the specified timeout, it will be completed with the provided default value.
private static void successOnTimeOut(CompletableFuture<Integer> future) {
future.completeOnTimeout(5, 1, TimeUnit.SECONDS);//Does not keep the pipeline in PENDING state
// for more than a second. the value doesn't arrive in 1 sec (timeout) then resolve it, via the default value
}
orTimeout
If the CompletableFuture times out, it is canceled, and the resulting CompletableFuture is considered completed exceptionally with a TimeoutException.
- it can interrupt the underlying task if it takes too long to complete.
private static void failureOnTimeOut(CompletableFuture<Integer> future) {
future.orTimeout(1, TimeUnit.SECONDS);//Does not keep the pipeline in PENDING state
//for more than a second. the value doesn't arrive in 1 sec (timeout) then cancel it, and completes it exceptionally with a TimeoutException
}
allOf()
Note :: allOf(..)
does not “wait” for all tasks to complete. It simply
returns a CompletableFuture
whenComplete()
whenComplete doesn’t allow for transformation of the result. it is primarily used for handling completion and any associated exceptions.
anyOf()
Returns the first one succeeded.
CompletableFuture
.anyOf(future1, future2, future3, future4)
.thenAccept(result -> {
System.out.println("Handling Accept :: " + result);
})
.exceptionally(throwable -> {
System.out.println("Handling Failure :: " + throwable);
return null;
})
.join();
completedFuture()
Purpose: This method creates a CompletableFuture that is already completed with the specified value.
Usage: It is used when you have a result that is already available and want to wrap it in a CompletableFuture without additional asynchronous computation.
Behavior: The resulting CompletableFuture is instantly completed with the given value, meaning that any operations on this CompletableFuture will execute immediately (or as soon as possible).
Streams API vs Async API
Functional Interface | Method | Streams API | Async API |
---|---|---|---|
Predicate |
boolean test() | filter() | |
Function<T,R> | R apply(T k) | map() | thenApply() |
Consumer |
void accept(T) | forEach() | thenAccept() |
Supplier |
T get() | Factories | supplyAsync() |
Streams vs CompletableFuture
Streams | CompletableFuture |
---|---|
Zreo, one or more data | zero or one |
only data channel | data channel or error channel |
pipeline & lazy | pipeline & lazy |
Exception - nope | error channel |
forEach | thenAccept (consumes data) |
map | thenApply - perform transformation |
((( zip ))) | thenCombine |
flatMap (returns Stream) | thenCompose (returns CompletableFuture) |