Java Multithreading - Part 8: CompletableFuture Mastery

10 minute read

Part 8: CompletableFuture Mastery

CompletableFuture (Java 8+) revolutionizes asynchronous programming with non-blocking pipelines, composition, and elegant error handling.

Video Reference

CompletableFuture Tutorial

Table of Contents

  1. Why CompletableFuture?
  2. CompletableFuture = Java’s Promise
  3. Creating CompletableFutures
  4. Transformation Methods
  5. Combining Futures
  6. Exception Handling
  7. Controlling Thread Pools
  8. Timeout Handling
  9. Blocking Operations: join(), get(), getNow()
  10. Streams API vs Async API
  11. Best Practices

Why CompletableFuture?

Limitations of Future

Problem Description
Blocking get() Defeats the async purpose
No chaining Can’t compose (fetchUser → enrichUser → saveUser)
No callbacks Must manually poll or block
No combining Hard to combine results from multiple futures
Limited functionality Cannot complete a future manually
// Traditional Future - BLOCKING!
Future<User> userFuture = executor.submit(() -> fetchUser(id));
User user = userFuture.get();  // BLOCKS main thread!

CompletableFuture Solution

// Non-blocking, chainable, composable!
CompletableFuture.supplyAsync(() -> fetchUser(id))
    .thenApply(user -> enrichUser(user))           // Transform
    .thenCombine(fetchOrdersAsync(id), this::merge) // Combine
    .exceptionally(ex -> handleError(ex))          // Error handling
    .thenAccept(dto -> sendResponse(dto));         // Consume

Feature Comparison

Feature Future CompletableFuture
Blocking get() Yes, always Optional (use callbacks)
Chaining ❌ No ✅ thenApply, thenAccept, thenRun
Combining ❌ No ✅ thenCombine, allOf, anyOf
Error handling ❌ No callbacks ✅ exceptionally, handle
Completion Automatic only Manual completion possible

CompletableFuture = Java’s Promise

CompletableFutures in Java is the same as Promise in JavaScript.

JavaScript Comparison

// JavaScript Promise
getData()
    .then(data => transform(data))
    .then(result => display(result))
    .catch(error => handleError(error));

Java Equivalent

// Java CompletableFuture
CompletableFuture.supplyAsync(() -> getData())
    .thenApply(data -> transform(data))
    .thenAccept(result -> display(result))
    .exceptionally(error -> handleError(error));

Promise State Transitions

Current State Next State Function called
resolved resolved next then in pipeline
resolved rejected next catch in pipeline
rejected resolved next then in pipeline
rejected rejected next catch in pipeline

Creating CompletableFutures

supplyAsync() - With Return Data

Runs computation asynchronously and returns a result.

CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
    return computeValue();  // Returns result
});
// Type: CompletableFuture<V>

runAsync() - No Return Data

Fire and forget - runs computation with no result.

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    doWork();  // No return value
});
// Type: CompletableFuture<Void>

completedFuture() - Immediate Value

Creates an already-completed future.

CompletableFuture<String> immediate = CompletableFuture.completedFuture("Done!");
// Instantly completed, operations execute immediately

Manual Completion

Create incomplete future, complete later:

CompletableFuture<String> future = new CompletableFuture<>();

// Later, in another thread...
future.complete(result);              // Normal completion
future.completeExceptionally(ex);     // Error completion

complete(T value)

  • Allows completing a CompletableFuture manually with a specific result value
  • Used to provide a result explicitly, bypassing the actual asynchronous computation

📁 Code: bFuturesAndCompletableFutures/completableFutureBasics/A1Intro.java


Transformation Methods

Stages of CompletableFutures

When one stage completes, another one starts and it keeps running.

Stage Method Type Functional Interface Description
supplyAsync() Factory Supplier Initiates async computation, returns CompletableFuture<T>
thenApply() / thenApplyAsync() Completion Stage Function Transform data, returns value of type T
thenAccept() / thenAcceptAsync() Completion Stage Consumer Consume result, returns CompletableFuture<Void>

Pipeline Visualization

supplyAsync() ──▶ thenApply() ──▶ thenCombine() ──▶ thenAccept()
     │                │                │                │
   Async          Transform        Combine          Consume
   Start          (Function)      (BiFunction)     (Consumer)

thenApply() - Transform Result

Like map() in streams. Takes a Function<T, U>, returns CompletableFuture<U>.

CompletableFuture.supplyAsync(() -> 42)
    .thenApply(num -> num * 2)           // 84
    .thenApply(num -> "Result: " + num); // "Result: 84"

thenAccept() - Consume Result

Like forEach() in streams. Takes a Consumer<T>, returns CompletableFuture<Void>.

CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(greeting -> System.out.println(greeting));

thenRun() - Run Action (Ignores Input)

Executes action after completion, ignoring the result.

CompletableFuture.supplyAsync(() -> compute())
    .thenRun(() -> System.out.println("Done!"));

Method Summary

Method Input Output Use Case
thenApply Function<T,U> CF Transform result
thenAccept Consumer CF Consume result
thenRun Runnable CF Side effect, ignore result

📁 Code: bFuturesAndCompletableFutures/completableFutureBasics/A3Pipeline.java


Combining Futures

thenCompose() - Flatten Nested Futures (flatMap)

Chains dependent async operations. Prevents nested CompletableFuture<CompletableFuture<T>>.

// ❌ BAD: thenApply creates nested CF<CF<User>>
cf.thenApply(id -> fetchUserAsync(id));  // Returns CF<CF<User>>

// ✅ GOOD: thenCompose flattens to CF<User>
cf.thenCompose(id -> fetchUserAsync(id));  // Returns CF<User>

Use Case: When second operation depends on first result.

Key Distinction:

  • compose() → sequencing dependent asynchronous tasks
  • thenCombine() → combine the results of two independent asynchronous tasks into a single result

thenCombine() - Combine Independent Futures

Runs two futures in parallel, combines results when both complete.

CompletableFuture<String> greeting = supplyAsync(() -> "Hello");
CompletableFuture<String> name = supplyAsync(() -> "World");

CompletableFuture<String> result = greeting.thenCombine(name, 
    (g, n) -> g + " " + n);  // "Hello World"
// Both run in parallel, total time = max(greeting, name)

Use Case: Microservice calls - bring data from multiple services and combine.

allOf() - Wait for All

Returns CompletableFuture<Void> that completes when all complete.

CompletableFuture<Void> allDone = CompletableFuture.allOf(f1, f2, f3);

allDone.thenRun(() -> {
    // All completed - get results manually
    String r1 = f1.join();
    String r2 = f2.join();
    String r3 = f3.join();
});

Note: allOf() does not “wait” - it returns a CompletableFuture immediately.

anyOf() - First to Complete

Returns CompletableFuture<Object> with result of first completed. Returns the first one succeeded.

CompletableFuture<Object> first = CompletableFuture.anyOf(fast, slow);
// Returns result of whichever completes first

Full example with error handling:

CompletableFuture
        .anyOf(future1, future2, future3, future4)
        .thenAccept(result -> {
            System.out.println("Handling Accept :: " + result);
        })
        .exceptionally(throwable -> {
            System.out.println("Handling Failure :: " + throwable);
            return null;
        })
        .join();

Summary

Method Description Return Type
thenCompose Chain dependent futures (flatMap) CF
thenCombine Combine 2 parallel futures CF
allOf Wait for all futures CF
anyOf First to complete CF

📁 Code: bFuturesAndCompletableFutures/completableFutureBasics/A12ThenCompose.java


Exception Handling

Railway Track Pattern

data track  -----f------f     recovering from exception       f--continue-----
                          \                                  /
error track ----------------f---can return default data-----f--or handle------

exceptionally() - Handle and Recover

Called only on exception. Provides fallback value.

CompletableFuture.supplyAsync(() -> riskyOperation())
    .exceptionally(ex -> {
        log.error("Failed", ex);
        return fallbackValue;  // Recovery value
    });

Pipeline Behavior: Exceptions skip stages until exceptionally:

future
    .thenApply(data -> 5 / data)       // May throw
    .exceptionally(ex -> 0)            // Recover with 0
    .thenApply(data -> data * 2)       // Continues with recovered value
    .thenAccept(System.out::println);

handle() - Handle Both Success and Failure

Called always (success or failure). Can transform or recover.

cf.handle((result, error) -> {
    if (error != null) {
        return "Recovered from: " + error.getMessage();
    }
    return result;
});

whenComplete() - Side Effect Only

Called always, but doesn’t change result. Good for logging/cleanup.

cf.whenComplete((result, error) -> {
    if (error != null) logger.error("Failed", error);
    else logger.info("Success: " + result);
});  // Original result or exception passes through

Comparison

Method Called When Returns Use Case
exceptionally Error only Fallback T Recovery
handle Always New T Transform either case
whenComplete Always Same T Logging, cleanup

📁 Code: bFuturesAndCompletableFutures/completableFutureBasics/A5Exceptionally.java


Controlling Thread Pools

Default Pool

By default, uses ForkJoinPool.commonPool():

CompletableFuture.supplyAsync(() -> task());  // Uses commonPool()
// Common pool size = CPU cores - 1
// ⚠️ Shared across entire JVM!

Common ForkJoinPool is shared by:

  • ParallelStreams
  • CompletableFuture

Custom Thread Pool

For I/O-bound tasks, use a separate pool:

int cores = Runtime.getRuntime().availableProcessors();
ExecutorService ioPool = Executors.newCachedThreadPool();
ExecutorService cpuPool = Executors.newFixedThreadPool(cores);
ForkJoinPool customFJP = new ForkJoinPool(10);

// Use custom pool
CompletableFuture.supplyAsync(() -> "Hello", ioPool);

// For transformations too
future.thenApplyAsync(s -> s.toUpperCase(), cpuPool);

Async Variants

Method Thread
thenApply May use same thread
thenApplyAsync New thread from common pool
thenApplyAsync(fn, executor) New thread from custom pool

Best Practice: Separate Pools

ExecutorService ioPool = Executors.newCachedThreadPool();      // I/O
ExecutorService cpuPool = Executors.newFixedThreadPool(cores); // CPU

cf.supplyAsync(() -> fetchFromDb(), ioPool)        // I/O
  .thenApplyAsync(data -> transform(data), cpuPool)   // CPU
  .thenAcceptAsync(result -> saveToDb(result), ioPool); // I/O

Timeout Handling

completeOnTimeout (Java 9+)

Complete with default value on timeout:

future.completeOnTimeout(defaultValue, 5, TimeUnit.SECONDS);
// If not complete in 5 seconds, complete with defaultValue

orTimeout (Java 9+)

Fail on timeout:

future.orTimeout(1, TimeUnit.SECONDS);
// Throws TimeoutException if not complete in 1 second

Blocking Operations: join(), get(), getNow()

Operation Description
join() Waits and returns result, wraps exceptions in CompletionException (unchecked)
get() Waits and returns result, throws checked exceptions (InterruptedException, ExecutionException)
getNow(value) Returns immediately with value or default, does not block

join() - Blocking Until Completion

Ensuring All Steps Complete: The CompletableFuture operations you chain (e.g., thenApply, exceptionally, thenAccept, thenRun) will execute asynchronously.

If the main thread exits before these operations complete, you won’t see their output. join() ensures that the main thread waits for the entire chain of operations to finish.

  • Used to obtain the result of the asynchronous computation when it’s done
  • Similar to get() method, but doesn’t throw checked exceptions
  • Waits indefinitely for the computation to finish
    • Returns the result
    • Or throws any unhandled exception if one occurs
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
int result = future.join(); // Get the result when the computation is complete
// Blocks the main thread until the supplyAsync is done

get() & getNow() - INSTEAD use thenAccept()

get() is a blocking call; The best thing to do with GET is to forGET. INSTEAD use thenAccept()

  • Like join(), get() is used to obtain the result of the asynchronous computation when it’s done
  • Unlike join(), get() can throw checked exceptions: InterruptedException and ExecutionException
  • Use get() if there is a need for explicit handling for interruptions and want to differentiate between exceptions and interruptions
  • If it’s so important to use get, use getNow() with a default value:
    • getNow() is impatient, non-blocking and moves on with a value if there is no immediate response
    • If there is delay prior to getNow call then the getNow may return the correct value
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
try {
    int result = future.get(); // Get the result and handle exceptions
    int data = future.getNow(-99); // Need to provide a value if the value is absent
    future.thenAccept(data -> System.out.println(data));
} catch (InterruptedException e) {
    System.out.println("Thread was interrupted");
    Thread.currentThread().interrupt(); // Preserve interruption status
} catch (ExecutionException e) {
    System.out.println("Caught exception: " + e.getCause()); // Print actual cause
}

Usage Recommendations

  • Prefer join() for a cleaner, unchecked exception handling
  • Use getNow(defaultValue) for a non-blocking way to retrieve results with a default
  • Use thenAccept() instead of blocking when possible

Streams API vs Async API

Functional Interface Method Streams API Async API
Predicate boolean test() filter() -
Function<T,R> R apply(T k) map() thenApply()
Consumer void accept(T) forEach() thenAccept()
Supplier T get() Factories supplyAsync()

Streams vs CompletableFuture

Streams CompletableFuture
Zero, one or more data Zero or one
Only data channel Data channel + error channel
Pipeline & lazy Pipeline & lazy
Exception - nope Error channel (exceptionally)
forEach thenAccept (consumes)
map thenApply (transform)
flatMap (returns Stream) thenCompose (returns CF)
- thenCombine (like zip)

Best Practices

1. Avoid Blocking Inside Async

// ❌ BAD - Blocking defeats async
CompletableFuture.supplyAsync(() -> future.get());

// ✅ GOOD - Keep chain async
CompletableFuture.supplyAsync(() -> task())
    .thenCompose(result -> anotherAsyncTask(result));

2. Don’t Ignore Exceptions

// ❌ BAD - Silent failure
CompletableFuture.runAsync(() -> riskyOp());

// ✅ GOOD - Handle errors
CompletableFuture.runAsync(() -> riskyOp())
    .exceptionally(ex -> { log.error(ex); return null; });

3. Use join() Over get()

// get() forces checked exception handling
try { future.get(); } 
catch (InterruptedException | ExecutionException e) { }

// join() wraps in unchecked exception - cleaner
future.join();  // Throws CompletionException (unchecked)

4. Use getNow() Instead of get()

// getNow() is non-blocking with default
int data = future.getNow(-99);  // Returns -99 if not done

5. Prefer Composition Over Blocking

// ❌ BAD - Sequential blocking
User user = fetchUser().get();
List<Order> orders = fetchOrders(user.getId()).get();

// ✅ GOOD - Composed async
fetchUser()
    .thenCompose(user -> fetchOrders(user.getId()))
    .thenCompose(orders -> fetchProducts(orders));

Summary

CompletableFuture = Java’s Promise
supplyAsync returns value, runAsync doesn’t
thenApply transforms, thenCompose flattens, thenCombine merges
exceptionally recovers, handle transforms both cases
Use custom pools for I/O-bound tasks
Prefer join() over get(), getNow() for non-blocking
orTimeout/completeOnTimeout for timeout handling (Java 9+)

Quick Reference

// Create
supplyAsync(() -> value);
runAsync(() -> action());
completedFuture(value);

// Transform
.thenApply(x -> transform(x))
.thenAccept(x -> consume(x))
.thenRun(() -> action())

// Combine
.thenCompose(x -> asyncOp(x))        // Flatten (flatMap)
.thenCombine(other, (a,b) -> merge)  // Parallel combine
CompletableFuture.allOf(f1, f2);     // Wait all
CompletableFuture.anyOf(f1, f2);     // First wins

// Errors
.exceptionally(ex -> fallback)        // Recover on error
.handle((r, ex) -> result)            // Handle both
.whenComplete((r, ex) -> log())       // Side effect

// Custom pool
.supplyAsync(task, executor)
.thenApplyAsync(fn, executor)

// Get result
.join()                               // Preferred (unchecked)
.get()                                // Checked exceptions
.orTimeout(5, SECONDS)                // Java 9+
.completeOnTimeout(default, 5, SECONDS)

Original Code Gists (Reference)

The following GitHub Gists contain the original code examples for this topic:

Topic Gist Link
JavaScript Promise Example
Java CompletableFuture Basic
supplyAsync()
runAsync()
Manual Pipeline Creation
complete(T value)
exceptionally()
thenCombine()
allOf()
whenComplete()

Next: Part 9: Concurrent Collections →