Java并发编程实战:CompletableFuture异步编排最佳实践
一、CompletableFuture核心优势
相比传统Future,CompletableFuture提供了:
- 显式完成控制(手动设置结果)
- 链式异步调用(thenApply/thenAccept)
- 组合多个Future(allOf/anyOf)
- 异常处理机制(exceptionally)
- 线程池灵活配置
二、电商订单系统案例
1. 业务场景分析
订单创建需要并行执行:
- 查询用户信息(IO密集型)
- 校验库存(IO密集型)
- 计算优惠(CPU密集型)
- 生成物流单(IO密集型)
2. 基础代码实现
// 自定义线程池配置 ExecutorService executor = Executors.newFixedThreadPool(4, new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build()); public CompletableFuture<OrderResult> createOrder(OrderRequest request) { // 1. 并行执行任务 CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync( () -> userService.getUser(request.getUserId()), executor); CompletableFuture<StockCheck> stockFuture = CompletableFuture.supplyAsync( () -> stockService.check(request.getSkuId(), request.getQuantity()), executor); CompletableFuture<Discount> discountFuture = CompletableFuture.supplyAsync( () -> discountService.calculate(request), executor); // 2. 合并结果 return CompletableFuture.allOf(userFuture, stockFuture, discountFuture) .thenApplyAsync(v -> { // 3. 依赖前序结果 UserInfo user = userFuture.join(); StockCheck stock = stockFuture.join(); Discount discount = discountFuture.join(); // 4. 生成物流单(串行) return logisticsService.create( buildOrder(user, stock, discount)); }, executor); }
3. 异常处理增强
public CompletableFuture<OrderResult> safeCreateOrder(OrderRequest request) { return createOrder(request) .exceptionally(ex -> { log.error("订单创建失败", ex); if (ex instanceof StockException) { return OrderResult.fail("库存不足"); } return OrderResult.fail("系统繁忙"); }) .whenComplete((result, ex) -> { if (ex != null) { alertService.notifyAdmin(ex); } }); }
三、高级应用技巧
1. 超时控制
public static <T> CompletableFuture<T> withTimeout( CompletableFuture<T> future, long timeout, TimeUnit unit) { return future.applyToEither( CompletableFuture.supplyAsync(() -> { try { Thread.sleep(unit.toMillis(timeout)); } catch (InterruptedException e) {} throw new TimeoutException(); }), Function.identity() ); }
2. 结果缓存
private final Cache<String, CompletableFuture<OrderResult>> cache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); public CompletableFuture<OrderResult> getOrderAsync(String orderId) { return cache.get(orderId, id -> CompletableFuture.supplyAsync(() -> queryOrder(id), executor)); }
四、性能优化建议
- IO密集型任务与CPU密集型任务使用不同线程池
- 避免在主线程调用
join()
/get()
- 合理设置线程池参数(核心/最大线程数、队列容量)
- 使用
thenApplyAsync
指定后续操作线程池