Java并发编程进阶:CompletableFuture原理与高并发实战
一、CompletableFuture核心机制
Java 8引入的异步编程利器:
// 基本创建方式
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello";
});
// 异步回调处理
future.thenApply(result -> result + " World")
.thenAccept(System.out::println)
.exceptionally(ex -> {
System.err.println("执行异常: " + ex.getMessage());
return null;
});
核心特性:非阻塞异步、链式调用、异常处理、组合操作
二、高级应用场景
1. 多任务并行执行
// 模拟三个独立服务调用
CompletableFuture<Double> priceFuture = getPriceAsync(productId);
CompletableFuture<Double> rateFuture = getRateAsync(currency);
CompletableFuture<Double> taxFuture = getTaxAsync(cityCode);
// 合并计算结果
CompletableFuture<Double> totalFuture = priceFuture
.thenCombine(rateFuture, (price, rate) -> price * rate)
.thenCombine(taxFuture, (subtotal, tax) -> subtotal * (1 + tax));
// 统一异常处理
totalFuture.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("计算失败: " + ex.getMessage());
} else {
System.out.println("最终价格: " + result);
}
});
2. 超时控制实现
// 带超时的异步调用
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<>();
Delayer.delayer.schedule(() -> {
result.completeExceptionally(new TimeoutException());
}, timeout, unit);
return result;
}
// 使用示例
CompletableFuture<String> asyncTask = fetchDataAsync();
CompletableFuture<String> timeoutFuture = timeoutAfter(2, TimeUnit.SECONDS);
asyncTask.applyToEither(timeoutFuture, Function.identity())
.thenAccept(data -> System.out.println("获取数据: " + data))
.exceptionally(ex -> {
if (ex.getCause() instanceof TimeoutException) {
System.err.println("请求超时");
}
return null;
});
三、性能优化实战
1. 线程池精细化配置
// 自定义线程工厂
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("async-pool-%d")
.setDaemon(true)
.build();
// 根据任务类型配置线程池
private static final ExecutorService ioBoundExecutor = new ThreadPoolExecutor(
16, 32, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy()
);
private static final ExecutorService cpuBoundExecutor = Executors.newWorkStealingPool();
// 使用指定线程池
CompletableFuture.supplyAsync(() -> {
// IO密集型任务
return queryDatabase();
}, ioBoundExecutor);
2. 异步任务缓存
// 基于Guava的异步缓存
private static final LoadingCache<String, CompletableFuture<Product>> productCache =
CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build(CacheLoader.from(key ->
CompletableFuture.supplyAsync(() -> queryProduct(key), ioBoundExecutor)
));
// 使用缓存
public CompletableFuture<Product> getProductAsync(String productId) {
try {
return productCache.get(productId);
} catch (ExecutionException e) {
return CompletableFuture.failedFuture(e);
}
}
四、电商系统实战案例
1. 订单支付流程编排
public CompletableFuture<OrderResult> processOrderAsync(Order order) {
// 1. 验证库存
return validateInventory(order.getItems())
// 2. 并行执行:创建支付记录+计算运费
.thenComposeAsync(valid -> {
CompletableFuture<Payment> paymentFuture = createPaymentAsync(order);
CompletableFuture<BigDecimal> shippingFuture = calculateShippingAsync(order);
return paymentFuture.thenCombine(shippingFuture, (payment, shipping) -> {
order.setShippingFee(shipping);
return payment;
});
}, orderExecutor)
// 3. 调用支付网关
.thenComposeAsync(this::callPaymentGateway, orderExecutor)
// 4. 更新订单状态
.thenComposeAsync(payment -> updateOrderStatus(order, payment), orderExecutor)
// 5. 发送通知(不阻塞主流程)
.whenCompleteAsync((result, ex) -> {
if (ex == null) {
sendNotifications(order, result).exceptionally(e -> {
log.error("通知发送失败", e);
return null;
});
}
}, notificationExecutor)
// 全局异常处理
.exceptionally(ex -> {
log.error("订单处理失败", ex);
rollbackOrder(order);
throw new OrderException("订单处理失败", ex);
});
}
五、最佳实践总结
- 合理命名线程池:便于问题排查
- 避免阻塞回调:保持异步特性
- 控制超时时间:防止无限等待
- 资源清理:及时关闭线程池
- 监控指标:跟踪任务执行情况