CompletableFuture Pipelines
In the old days of Java (pre-Java 8), handling asynchronous results meant dealing with Future.get(), which blocked the calling thread until the result was ready. This defeated the purpose of asynchronous programming if you had to wait anyway.
Enter CompletableFuture. It implements CompletionStage, allowing you to attach callbacks that execute when the future completes, without blocking the main thread. This enables you to build complex, non-blocking pipelines.
1. The Problem: Blocking vs. Non-Blocking
- Blocking (Future): “Here’s a ticket. Wait here until your order is ready.”
- Non-Blocking (CompletableFuture): “Here’s a beeper. Go do whatever you want; we’ll buzz you when it’s ready.”
2. Interactive: The Async Pipeline Builder
Visualize how data flows through a chain of asynchronous operations. Each stage depends on the previous one but executes independently once its dependency is met.
3. Code Patterns
Java: The Fluent API
Java uses a chainable API. Notice how exceptions are handled at the end, similar to a catch block.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AsyncPipeline {
// Simulates an async API call
private static CompletableFuture<String> fetchUser(String id) {
return CompletableFuture.supplyAsync(() -> {
sleep(1000);
if (id.isEmpty()) throw new RuntimeException("Invalid ID");
return "{id: " + id + ", name: \"Alice\"}";
});
}
private static CompletableFuture<String> fetchOrders(String userJson) {
return CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "[Order#1, Order#2]";
});
}
private static void sleep(int ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> pipeline = fetchUser("123")
.thenApply(json -> {
System.out.println("Parsed: " + json);
return json; // Transformation
})
.thenCompose(user -> fetchOrders(user)) // Chain another async task (flatMap)
.thenAccept(orders -> System.out.println("UI Update: " + orders)) // Consumption
.exceptionally(ex -> {
System.err.println("Pipeline failed: " + ex.getMessage());
return null; // Recover or swallow
});
// Block main thread just for the example to finish
pipeline.get();
}
}
Go: Channels and Goroutines
Go doesn’t chain callbacks. Instead, it uses blocking calls inside lightweight goroutines and communicates results via channels or uses errgroup for coordination.
package main
import (
"fmt"
"time"
)
func fetchUser(id string) <-chan string {
c := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c <- fmt.Sprintf(`{id: "%s", name: "Alice"}`, id)
}()
return c
}
func fetchOrders(userJson string) <-chan string {
c := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c <- "[Order#1, Order#2]"
}()
return c
}
func main() {
// Step 1: Start async fetch
userChan := fetchUser("123")
// Main thread can do other work here...
// Step 2: Receive result (blocking point 1)
userJson := <-userChan
fmt.Println("Parsed:", userJson)
// Step 3: Start dependent async task
ordersChan := fetchOrders(userJson)
// Step 4: Receive result (blocking point 2)
orders := <-ordersChan
fmt.Println("UI Update:", orders)
}
[!NOTE] Go’s model is imperative and synchronous-looking, even though it executes concurrently. This avoids “callback hell” naturally.
4. Common Operations
| Operation | Method | Description |
|---|---|---|
| Transform | thenApply(Function) |
map: T → U (Synchronous) |
| Consume | thenAccept(Consumer) |
forEach: T → void (Side-effect) |
| Compose | thenCompose(Function) |
flatMap: T → CompletableFuture<U> (Async Chaining) |
| Combine | thenCombine(Future, BiFunction) |
Wait for both A and B, then combine results. |
| All Of | allOf(Future...) |
Wait for N futures to complete (Barrier). |
5. Summary
CompletableFuture brings powerful functional programming patterns to Java concurrency. It allows you to describe what should happen with the data once it arrives, rather than focusing on how to wait for it.