深入解析Java 8引入的CompletableFuture,掌握异步编程的核心技术与实战应用
CompletableFuture概述
CompletableFuture是Java 8引入的一个强大的异步编程工具,它实现了Future和CompletionStage接口,提供了丰富的API来处理异步计算和流式操作。相比传统的Future,CompletableFuture具有以下优势:
- 支持显式完成(手动设置结果)
- 支持异步回调(当完成时触发操作)
- 支持流式编程和函数式组合
- 支持多个Future的组合操作
- 提供强大的异常处理机制
CompletableFuture的核心设计理念是将异步任务和回调函数以声明式的方式组合起来,使代码更加简洁和易读。
基本使用方法
CompletableFuture提供了多种创建方式,下面介绍最常用的几种方法:
1. 创建简单的CompletableFuture
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class BasicExample {
public static void main(String[] args) {
// 使用runAsync执行没有返回值的异步任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("异步任务执行中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("异步任务执行完成");
});
// 使用supplyAsync执行有返回值的异步任务
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, CompletableFuture!";
});
// 获取结果(会阻塞当前线程)
try {
String result = future2.get();
System.out.println("获取到结果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
2. 使用回调处理结果
public class CallbackExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟长时间运行的任务
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务完成";
});
// 使用thenApply转换结果
CompletableFuture<String> upperCaseFuture = future.thenApply(String::toUpperCase);
// 使用thenAccept消费结果
upperCaseFuture.thenAccept(result -> {
System.out.println("处理结果: " + result);
});
// 使用thenRun在完成后执行操作
upperCaseFuture.thenRun(() -> {
System.out.println("所有处理完成");
});
// 等待异步任务完成
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
异步操作与线程池
CompletableFuture默认使用ForkJoinPool.commonPool()作为执行异步任务的线程池,但我们也可以自定义线程池来更好地控制资源。
1. 使用自定义线程池
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建自定义线程池
ExecutorService customThreadPool = Executors.newFixedThreadPool(5);
// 使用自定义线程池执行异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("线程: " + Thread.currentThread().getName());
return "结果";
}, customThreadPool);
// 处理结果
future.thenAcceptAsync(result -> {
System.out.println("处理线程: " + Thread.currentThread().getName());
System.out.println("处理结果: " + result);
}, customThreadPool);
// 等待任务完成
try {
future.get();
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}
2. 异步操作的执行线程
public class ThreadBehaviorExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("SupplyAsync 执行线程: " + Thread.currentThread().getName());
return "初始结果";
});
// thenApply会在上一个任务完成的同一个线程中执行
future.thenApply(result -> {
System.out.println("ThenApply 执行线程: " + Thread.currentThread().getName());
return result + " -> 转换1";
});
// thenApplyAsync会在另一个线程中异步执行
future.thenApplyAsync(result -> {
System.out.println("ThenApplyAsync 执行线程: " + Thread.currentThread().getName());
return result + " -> 异步转换";
});
// 等待任务完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
组合操作方法
CompletableFuture提供了多种组合操作的方法,可以处理多个异步任务之间的依赖关系。
1. thenCompose – 链式依赖
public class ThenComposeExample {
public static void main(String[] args) {
// 模拟用户ID查询
CompletableFuture<Integer> userIdFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1001;
});
// 基于用户ID查询用户详情(链式依赖)
CompletableFuture<String> userDetailFuture = userIdFuture.thenCompose(userId -> {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用户详情 for ID: " + userId;
});
});
userDetailFuture.thenAccept(System.out::println);
// 等待任务完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2. thenCombine – 合并结果
public class ThenCombineExample {
public static void main(String[] args) {
// 异步获取用户基本信息
CompletableFuture<String> userInfoFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用户基本信息";
});
// 异步获取用户订单信息
CompletableFuture<String> userOrdersFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用户订单信息";
});
// 合并两个异步任务的结果
CompletableFuture<String> combinedFuture = userInfoFuture.thenCombine(userOrdersFuture,
(userInfo, userOrders) -> "整合结果: " + userInfo + " + " + userOrders);
combinedFuture.thenAccept(System.out::println);
// 等待任务完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3. allOf / anyOf – 多任务组合
public class MultipleFutureExample {
public static void main(String[] args) {
// 创建多个异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(300); } catch (InterruptedException e) {}
return "结果1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) {}
return "结果2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(200); } catch (InterruptedException e) {}
return "结果3";
});
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.thenRun(() -> {
System.out.println("所有任务已完成");
try {
System.out.println("Future1结果: " + future1.get());
System.out.println("Future2结果: " + future2.get());
System.out.println("Future3结果: " + future3.get());
} catch (Exception e) {
e.printStackTrace();
}
});
// 等待任意一个任务完成
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2, future3);
anyFuture.thenAccept(result -> {
System.out.println("第一个完成的任务结果: " + result);
});
// 等待任务完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
异常处理机制
CompletableFuture提供了多种处理异常的方式,使异步编程更加健壮。
1. 使用exceptionally处理异常
public class ExceptionallyExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常发生");
}
return "成功结果";
});
// 使用exceptionally处理异常,提供默认值
CompletableFuture<String> safeFuture = future.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "默认结果";
});
safeFuture.thenAccept(System.out::println);
// 等待任务完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2. 使用handle统一处理结果和异常
public class HandleExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("任务执行失败");
}
return "任务执行成功";
});
// 使用handle同时处理正常结果和异常
CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
if (ex != null) {
return "异常处理: " + ex.getMessage();
}
return "结果处理: " + result;
});
handledFuture.thenAccept(System.out::println);
// 等待任务完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3. 使用whenComplete获取完成通知
public class WhenCompleteExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("执行过程中发生错误");
}
return "正常结果";
});
// 使用whenComplete获取完成通知(无论成功或失败)
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("任务失败: " + ex.getMessage());
} else {
System.out.println("任务成功: " + result);
}
});
// 等待任务完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
性能优化技巧
合理使用CompletableFuture可以显著提升应用性能,下面介绍一些优化技巧。
1. 避免不必要的阻塞
public class AvoidBlockingExample {
public static void main(String[] args) {
// 不推荐的写法:频繁调用get()导致阻塞
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");
try {
String result1 = future1.get(); // 阻塞
String result2 = future2.get(); // 阻塞
System.out.println(result1 + " - " + result2);
} catch (Exception e) {
e.printStackTrace();
}
// 推荐的写法:使用非阻塞组合操作
CompletableFuture<String> combinedFuture = future1.thenCombine(future2,
(r1, r2) -> r1 + " - " + r2);
combinedFuture.thenAccept(System.out::println);
// 等待任务完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2. 合理使用线程池
import java.util.concurrent.*;
public class ThreadPoolOptimization {
// 根据任务类型使用不同的线程池
private static final ExecutorService ioBoundThreadPool =
Executors.newFixedThreadPool(50); // I/O密集型任务
private static final ExecutorService cpuBoundThreadPool =
Executors.newWorkStealingPool(); // CPU密集型任务
public static void main(String[] args) {
// I/O密集型任务使用较大的线程池
CompletableFuture<String> ioFuture = CompletableFuture.supplyAsync(() -> {
// 模拟I/O操作
try { Thread.sleep(100); } catch (InterruptedException e) {}
return "I/O操作结果";
}, ioBoundThreadPool);
// CPU密集型任务使用工作窃取线程池
CompletableFuture<Integer> cpuFuture = CompletableFuture.supplyAsync(() -> {
// 模拟CPU密集型计算
int sum = 0;
for (int i = 0; i < 1000000; i++) {
sum += i;
}
return sum;
}, cpuBoundThreadPool);
CompletableFuture.allOf(ioFuture, cpuFuture).join();
// 关闭线程池
ioBoundThreadPool.shutdown();
cpuBoundThreadPool.shutdown();
}
}
实战案例:电商价格查询系统
下面通过一个电商价格查询系统的案例,展示CompletableFuture在实际项目中的应用。
1. 系统需求
我们需要开发一个价格查询服务,能够同时从多个供应商获取商品价格,并返回最低价格。要求:
- 并行查询多个供应商
- 设置查询超时时间(2秒)
- 返回最先响应的3个供应商中的最低价格
- 优雅处理供应商服务不可用的情况
2. 实现代码
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class PriceQuerySystem {
// 模拟供应商服务
private static Map<String, SupplierService> suppliers = new HashMap<>();
static {
suppliers.put("supplierA", new SupplierService("A", 100, 1500));
suppliers.put("supplierB", new SupplierService("B", 200, 1200));
suppliers.put("supplierC", new SupplierService("C", 150, 1800));
suppliers.put("supplierD", new SupplierService("D", 300, 2000));
suppliers.put("supplierE", new SupplierService("E", 250, 1600));
}
// 供应商服务类
static class SupplierService {
private String name;
private int basePrice;
private int responseTime; // 响应时间(毫秒)
public SupplierService(String name, int basePrice, int responseTime) {
this.name = name;
this.basePrice = basePrice;
this.responseTime = responseTime;
}
public CompletableFuture<Optional<Integer>> getPrice(String productId) {
return CompletableFuture.supplyAsync(() -> {
try {
// 模拟网络延迟
Thread.sleep(responseTime);
// 模拟随机故障(10%概率失败)
if (Math.random() entry.getValue().getPrice(productId))
.collect(Collectors.toList());
// 创建一个CompletableFuture来接收最先完成的3个结果
CompletableFuture<List<Optional<Integer>>> firstThree = new CompletableFuture<>();
// 使用ExecutorService来管理超时
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 设置超时(2秒)
scheduler.schedule(() -> {
firstThree.complete(new ArrayList<>());
}, 2000, TimeUnit.MILLISECONDS);
// 计数器,跟踪完成的任务数量
AtomicInteger completedCount = new AtomicInteger(0);
List<Optional<Integer>> results = Collections.synchronizedList(new ArrayList<>());
// 为每个价格查询任务添加回调
for (CompletableFuture<Optional<Integer>> future : priceFutures) {
future.whenComplete((price, ex) -> {
if (price != null && price.isPresent()) {
results.add(price);
if (completedCount.incrementAndGet() >= 3) {
firstThree.complete(results);
}
}
});
}
// 处理结果
return firstThree.thenApply(priceList -> {
scheduler.shutdown();
// 过滤掉空结果并找出最低价格
return priceList.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.min(Integer::compareTo);
});
}
public static void main(String[] args) {
String productId = "product_123";
System.out.println("开始查询商品 " + productId + " 的最低价格...");
CompletableFuture<Optional<Integer>> bestPriceFuture = findBestPrice(productId);
bestPriceFuture.thenAccept(priceOpt -> {
if (priceOpt.isPresent()) {
System.out.println("找到最低价格: " + priceOpt.get());
} else {
System.out.println("无法获取价格,所有供应商都不可用或超时");
}
});
// 等待查询完成
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3. 系统优化点
- 使用并行查询提高响应速度
- 设置超时机制防止长时间等待
- 使用优雅降级处理供应商故障
- 采用最快响应策略提升用户体验
最佳实践与注意事项
在使用CompletableFuture时,遵循以下最佳实践可以避免常见陷阱:
1. 线程池管理
- 根据任务类型选择合适的线程池
- 避免使用默认的ForkJoinPool处理阻塞I/O操作
- 合理设置线程池大小,防止资源耗尽
- 记得关闭自定义线程池
2. 异常处理
- 始终处理可能的异常,避免静默失败
- 使用exceptionally或handle提供优雅降级
- 记录异常日志以便调试和监控
3. 性能考虑
- 避免不必要的阻塞操作,尽量使用回调
- 注意任务拆分的粒度,避免创建过多小任务
- 监控异步任务的执行时间和成功率
4. 代码可读性
- 合理使用方法链,但避免过长的链式调用
- 使用有意义的变量名和方法名
- 添加必要的注释说明异步流程
5. 常见陷阱
public class CommonPitfalls {
public static void main(String[] args) {
// 陷阱1:忘记处理异常
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("未处理的异常");
});
// 应该添加异常处理:future1.exceptionally(ex -> "默认值");
// 陷阱2:错误使用thenApply和thenApplyAsync
CompletableFuture.supplyAsync(() -> "结果")
.thenApply(result -> {
// 这个操作会在上一个任务的同一线程中执行
// 如果是阻塞操作,会影响整个流水线
return result + " 处理1";
})
.thenApplyAsync(result -> {
// 这个操作会在另一个线程中执行
// 适合用于阻塞或耗时操作
return result + " 处理2";
});
// 陷阱3:不必要的嵌套
CompletableFuture<CompletableFuture<String>> badNesting =
CompletableFuture.supplyAsync(() -> "结果")
.thenApply(result ->
CompletableFuture.supplyAsync(() -> result + " 嵌套"));
// 应该使用thenCompose代替
CompletableFuture<String> goodChaining =
CompletableFuture.supplyAsync(() -> "结果")
.thenCompose(result ->
CompletableFuture.supplyAsync(() -> result + " 链式"));
}
}