11 · CompletableFuture & Async Pipelines

Level: Intermediate

Pre-reading: 10 · Executor Framework


CompletableFuture Basics

Creating CompletableFuture

// Completed immediately with value
CompletableFuture<Integer> future = CompletableFuture.completedFuture(42);

// Run async, returns void
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    System.out.println("Running async");
});

// Run async, returns value
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    return "Hello";
});

// Manual completion
CompletableFuture<Integer> manual = new CompletableFuture<>();
// ... later ...
manual.complete(99);  // or manual.completeExceptionally(exception);

Chaining Operations

thenApply() — Transform Result

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 5);

future.thenApply(x -> x * 2)        // 10
      .thenApply(x -> x + 1)        // 11
      .thenAccept(x -> System.out.println(x));  // 11

thenCombine() — Combine Two Futures

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 3);

future1.thenCombine(future2, (x, y) -> x + y)
       .thenAccept(sum -> System.out.println("Sum: " + sum));  // 8

thenCompose() — Flatten Nested Futures

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 5)
    .thenCompose(x -> CompletableFuture.supplyAsync(() -> x * 2));  // Flat 10

// vs thenApply would wrap it twice

anyOf() / allOf()

CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 2);

// Returns when ANY completes
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2);

// Returns when ALL complete
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.join();  // Wait for all

Exception Handling

exceptionally()

CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Oops");
})
.exceptionally(ex -> {
    System.err.println("Error: " + ex.getMessage());
    return 0;  // Default value
})
.thenAccept(x -> System.out.println("Got: " + x));

handle()

CompletableFuture.supplyAsync(() -> 42)
.handle((result, ex) -> {
    if (ex != null) {
        return 0;  // Error case
    }
    return result * 2;  // Success case
})
.thenAccept(x -> System.out.println(x));

Executing On Specific Executor

ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture.supplyAsync(() -> "data", executor)
    .thenApply(x -> x.toUpperCase())  // May run on different thread
    .thenApplyAsync(x -> process(x), executor)  // Explicitly on executor
    .thenAccept(x -> System.out.println(x));

Real-World Example

class DataFetcher {
    private ExecutorService executor = Executors.newFixedThreadPool(4);

    // Fetch user data from remote service
    CompletableFuture<User> fetchUser(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            return callRemoteUserService(userId);
        }, executor);
    }

    // Fetch user's friends
    CompletableFuture<List<Friend>> fetchFriends(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            return callRemoteFriendsService(userId);
        }, executor);
    }

    // Combined: fetch user and friends in parallel
    CompletableFuture<UserProfile> fetchUserProfile(int userId) {
        return fetchUser(userId)
            .thenCombine(
                fetchFriends(userId),
                (user, friends) -> new UserProfile(user, friends)
            );
    }

    // Usage
    void printProfile(int userId) {
        fetchUserProfile(userId)
            .exceptionally(ex -> {
                System.err.println("Failed to fetch profile: " + ex);
                return null;
            })
            .thenAccept(profile -> {
                if (profile != null) {
                    System.out.println(profile);
                }
            });
    }
}

Key Takeaways

  • CompletableFuture: Programmatic control over async operations
  • thenApply(): Transform result
  • thenCombine(): Combine two futures
  • thenCompose(): Chain dependent futures
  • exceptionally(): Error handling
  • Executor parameter: Control which thread runs the operation

📚 Read the Original Blog Post

For more details and examples, read:


What's the difference between thenApply and thenApplyAsync?

thenApply runs on the same thread that produced the value. thenApplyAsync runs on an executor thread (default ForkJoinPool).

When should I use CompletableFuture vs Future?

CompletableFuture adds composition, chaining, and manual completion. Use CompletableFuture for modern async code.

Can I cancel a CompletableFuture?

Yes, with complete()/completeExceptionally(), but not after it's already completed.