CompletableFuture Pipelines

In the old days of Java (pre-Java 8), handling asynchronous results meant dealing with Future.get(), which blocked the calling thread until the result was ready. This defeated the purpose of asynchronous programming if you had to wait anyway.

Enter CompletableFuture. It implements CompletionStage, allowing you to attach callbacks that execute when the future completes, without blocking the main thread. This enables you to build complex, non-blocking pipelines.

1. The Problem: Blocking vs. Non-Blocking

  • Blocking (Future): “Here’s a ticket. Wait here until your order is ready.”
  • Non-Blocking (CompletableFuture): “Here’s a beeper. Go do whatever you want; we’ll buzz you when it’s ready.”

2. Interactive: The Async Pipeline Builder

Visualize how data flows through a chain of asynchronous operations. Each stage depends on the previous one but executes independently once its dependency is met.

Waiting to start...

3. Code Patterns

Java: The Fluent API

Java uses a chainable API. Notice how exceptions are handled at the end, similar to a catch block.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AsyncPipeline {

    // Simulates an async API call
    private static CompletableFuture<String> fetchUser(String id) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            if (id.isEmpty()) throw new RuntimeException("Invalid ID");
            return "{id: " + id + ", name: \"Alice\"}";
        });
    }

    private static CompletableFuture<String> fetchOrders(String userJson) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "[Order#1, Order#2]";
        });
    }

    private static void sleep(int ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> pipeline = fetchUser("123")
            .thenApply(json -> {
                System.out.println("Parsed: " + json);
                return json; // Transformation
            })
            .thenCompose(user -> fetchOrders(user)) // Chain another async task (flatMap)
            .thenAccept(orders -> System.out.println("UI Update: " + orders)) // Consumption
            .exceptionally(ex -> {
                System.err.println("Pipeline failed: " + ex.getMessage());
                return null; // Recover or swallow
            });

        // Block main thread just for the example to finish
        pipeline.get();
    }
}

Go: Channels and Goroutines

Go doesn’t chain callbacks. Instead, it uses blocking calls inside lightweight goroutines and communicates results via channels or uses errgroup for coordination.

package main

import (
	"fmt"
	"time"
)

func fetchUser(id string) <-chan string {
	c := make(chan string)
	go func() {
		time.Sleep(1 * time.Second)
		c <- fmt.Sprintf(`{id: "%s", name: "Alice"}`, id)
	}()
	return c
}

func fetchOrders(userJson string) <-chan string {
	c := make(chan string)
	go func() {
		time.Sleep(1 * time.Second)
		c <- "[Order#1, Order#2]"
	}()
	return c
}

func main() {
	// Step 1: Start async fetch
	userChan := fetchUser("123")

	// Main thread can do other work here...

	// Step 2: Receive result (blocking point 1)
	userJson := <-userChan
	fmt.Println("Parsed:", userJson)

	// Step 3: Start dependent async task
	ordersChan := fetchOrders(userJson)

	// Step 4: Receive result (blocking point 2)
	orders := <-ordersChan
	fmt.Println("UI Update:", orders)
}

[!NOTE] Go’s model is imperative and synchronous-looking, even though it executes concurrently. This avoids “callback hell” naturally.

4. Common Operations

Operation Method Description
Transform thenApply(Function) map: T → U (Synchronous)
Consume thenAccept(Consumer) forEach: T → void (Side-effect)
Compose thenCompose(Function) flatMap: T → CompletableFuture<U> (Async Chaining)
Combine thenCombine(Future, BiFunction) Wait for both A and B, then combine results.
All Of allOf(Future...) Wait for N futures to complete (Barrier).

5. Summary

CompletableFuture brings powerful functional programming patterns to Java concurrency. It allows you to describe what should happen with the data once it arrives, rather than focusing on how to wait for it.