发布日期:2024年1月20日 | 作者:Java架构师
一、CompletableFuture概述
Java 8引入的CompletableFuture是并发编程的重要里程碑,它提供了强大的异步编程能力,能够显著提升应用程序的响应性和吞吐量。相比传统的Future,CompletableFuture支持链式调用、组合操作和异常处理等高级特性。
CompletableFuture的核心优势
- 非阻塞操作:避免线程等待,提高CPU利用率
- 链式编程:支持函数式编程风格,代码更简洁
- 灵活的组合:可以轻松组合多个异步任务
- 完善的异常处理:提供多种异常处理机制
- 与Stream API完美结合:充分发挥Java 8的函数式特性
二、基础用法详解
创建CompletableFuture
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BasicUsage {
// 使用默认的ForkJoinPool
public static CompletableFuture createSimpleFuture() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务执行完成";
});
}
// 使用自定义线程池
public static CompletableFuture createFutureWithCustomExecutor() {
ExecutorService executor = Executors.newFixedThreadPool(10);
return CompletableFuture.supplyAsync(() -> {
// 执行异步任务
return "使用自定义线程池执行任务";
}, executor);
}
public static void main(String[] args) throws Exception {
CompletableFuture future = createSimpleFuture();
String result = future.get(); // 阻塞获取结果
System.out.println(result);
}
}
结果处理与转换
public class ResultProcessing {
public static void demonstrateThenApply() {
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World") // 同步转换
.thenApply(String::toUpperCase); // 方法引用
future.thenAccept(System.out::println); // 消费结果
}
public static void demonstrateThenCompose() {
CompletableFuture future = CompletableFuture.supplyAsync(() -> "用户ID:123")
.thenCompose(userId -> getUserDetails(userId)); // 异步组合
future.thenAccept(details ->
System.out.println("用户详情: " + details));
}
private static CompletableFuture getUserDetails(String userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟数据库查询
return userId + " - 姓名: 张三, 年龄: 25";
});
}
}
三、组合操作方法
多任务组合执行
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CombinationMethods {
// 等待所有任务完成
public static void allOfExample() {
CompletableFuture task1 = CompletableFuture.supplyAsync(() -> {
sleep(1); return "任务1结果";
});
CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
sleep(2); return "任务2结果";
});
CompletableFuture task3 = CompletableFuture.supplyAsync(() -> {
sleep(3); return "任务3结果";
});
CompletableFuture allTasks = CompletableFuture.allOf(task1, task2, task3);
allTasks.thenRun(() -> {
System.out.println("所有任务完成");
System.out.println("任务1: " + task1.join());
System.out.println("任务2: " + task2.join());
System.out.println("任务3: " + task3.join());
});
}
// 任意一个任务完成即可
public static void anyOfExample() {
CompletableFuture fastTask = CompletableFuture.supplyAsync(() -> {
sleep(1); return "快速任务";
});
CompletableFuture slowTask = CompletableFuture.supplyAsync(() -> {
sleep(5); return "慢速任务";
});
CompletableFuture
四、实战案例:电商订单处理系统
下面通过一个完整的电商订单处理系统来演示CompletableFuture在实际项目中的应用。
系统架构设计
/**
* 电商订单处理服务
* 使用CompletableFuture实现异步并行处理
*/
public class OrderProcessingService {
private final InventoryService inventoryService;
private final UserService userService;
private final PaymentService paymentService;
private final ShippingService shippingService;
private final NotificationService notificationService;
private final ExecutorService executor;
public OrderProcessingService() {
this.inventoryService = new InventoryService();
this.userService = new UserService();
this.paymentService = new PaymentService();
this.shippingService = new ShippingService();
this.notificationService = new NotificationService();
this.executor = Executors.newFixedThreadPool(20);
}
/**
* 异步处理订单
*/
public CompletableFuture processOrderAsync(Order order) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("开始处理订单: " + order.getId());
return order;
}, executor)
.thenCompose(this::validateOrder)
.thenCompose(this::processPayment)
.thenCompose(this::updateInventory)
.thenCompose(this::arrangeShipping)
.thenCompose(this::sendNotifications)
.exceptionally(this::handleOrderException);
}
private CompletableFuture validateOrder(Order order) {
return CompletableFuture.supplyAsync(() -> {
// 并行验证用户信息和库存
CompletableFuture userValidation = userService.validateUserAsync(order.getUserId());
CompletableFuture inventoryCheck = inventoryService.checkInventoryAsync(order.getItems());
return CompletableFuture.allOf(userValidation, inventoryCheck)
.thenApply(v -> {
if (userValidation.join() && inventoryCheck.join()) {
return order;
}
throw new RuntimeException("订单验证失败");
});
}, executor).thenCompose(future -> future);
}
private CompletableFuture processPayment(Order order) {
return paymentService.processPaymentAsync(order)
.thenApply(paymentResult -> {
if (paymentResult.isSuccess()) {
order.setStatus(OrderStatus.PAID);
return order;
}
throw new RuntimeException("支付处理失败");
});
}
private CompletableFuture updateInventory(Order order) {
return inventoryService.updateInventoryAsync(order.getItems())
.thenApply(inventoryResult -> {
order.setStatus(OrderStatus.INVENTORY_UPDATED);
return order;
});
}
private CompletableFuture arrangeShipping(Order order) {
return shippingService.arrangeShippingAsync(order)
.thenApply(shippingInfo -> {
order.setShippingInfo(shippingInfo);
order.setStatus(OrderStatus.SHIPPING_ARRANGED);
return order;
});
}
private CompletableFuture sendNotifications(Order order) {
// 并行发送多种通知
CompletableFuture emailNotification = notificationService.sendEmailAsync(order);
CompletableFuture smsNotification = notificationService.sendSMSAsync(order);
return CompletableFuture.allOf(emailNotification, smsNotification)
.thenApply(v -> {
order.setStatus(OrderStatus.COMPLETED);
return new OrderResult(order, true, "订单处理完成");
});
}
private OrderResult handleOrderException(Throwable ex) {
System.err.println("订单处理异常: " + ex.getMessage());
return new OrderResult(null, false, "订单处理失败: " + ex.getMessage());
}
}
// 订单结果类
class OrderResult {
private final Order order;
private final boolean success;
private final String message;
public OrderResult(Order order, boolean success, String message) {
this.order = order;
this.success = success;
this.message = message;
}
// getter方法...
}
enum OrderStatus {
CREATED, PAID, INVENTORY_UPDATED, SHIPPING_ARRANGED, COMPLETED
}
批量订单处理
/**
* 批量订单处理器
* 使用CompletableFuture处理大量并发订单
*/
public class BatchOrderProcessor {
private final OrderProcessingService orderService;
private final ExecutorService batchExecutor;
public BatchOrderProcessor() {
this.orderService = new OrderProcessingService();
this.batchExecutor = Executors.newFixedThreadPool(50);
}
/**
* 并行处理批量订单
*/
public CompletableFuture<List> processBatchOrders(List orders) {
// 为每个订单创建处理任务
List<CompletableFuture> futures = orders.stream()
.map(order -> orderService.processOrderAsync(order))
.collect(Collectors.toList());
// 等待所有订单处理完成
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
/**
* 带限流的批量处理
*/
public CompletableFuture<List> processWithRateLimit(List orders, int batchSize) {
List<List> batches = partitionList(orders, batchSize);
List<CompletableFuture<List>> batchFutures = batches.stream()
.map(batch -> CompletableFuture.supplyAsync(() ->
processBatchSync(batch), batchExecutor))
.collect(Collectors.toList());
return CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> batchFutures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList()));
}
private List processBatchSync(List batch) {
return batch.stream()
.map(order -> {
try {
return orderService.processOrderAsync(order).get(30, TimeUnit.SECONDS);
} catch (Exception e) {
return new OrderResult(order, false, "处理超时或异常");
}
})
.collect(Collectors.toList());
}
private List<List> partitionList(List list, int size) {
List<List> partitions = new ArrayList();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}
五、性能优化策略
线程池配置优化
public class ThreadPoolOptimizer {
/**
* 根据任务类型创建合适的线程池
*/
public static ExecutorService createOptimizedExecutor(String taskType) {
int availableProcessors = Runtime.getRuntime().availableProcessors();
switch (taskType) {
case "CPU_INTENSIVE":
// CPU密集型任务,线程数不宜过多
return Executors.newFixedThreadPool(availableProcessors);
case "IO_INTENSIVE":
// IO密集型任务,可以设置更多线程
return Executors.newFixedThreadPool(availableProcessors * 2);
case "MIXED":
// 混合型任务
return Executors.newWorkStealingPool();
default:
return Executors.newCachedThreadPool();
}
}
/**
* 自定义线程池工厂
*/
public static ThreadFactory namedThreadFactory(String prefix) {
return new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, prefix + "-" + counter.getAndIncrement());
thread.setDaemon(true); // 设置为守护线程
return thread;
}
};
}
}
超时控制与熔断机制
public class TimeoutCircuitBreaker {
/**
* 带超时控制的CompletableFuture
*/
public static CompletableFuture withTimeout(
CompletableFuture future, long timeout, TimeUnit unit) {
return future.applyToEither(
timeoutAfter(timeout, unit),
Function.identity()
);
}
private static CompletableFuture timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture result = new CompletableFuture();
Delayer.delayer.schedule(() ->
result.completeExceptionally(new TimeoutException()),
timeout, unit);
return result;
}
static class Delayer {
static final ScheduledThreadPoolExecutor delayer =
new ScheduledThreadPoolExecutor(1, namedThreadFactory("CompletableFutureDelay"));
}
/**
* 简单的熔断器实现
*/
public static class CircuitBreaker {
private final int failureThreshold;
private final long timeout;
private final AtomicInteger failures = new AtomicInteger(0);
private volatile long lastFailureTime = 0;
private volatile boolean isOpen = false;
public CircuitBreaker(int failureThreshold, long timeout) {
this.failureThreshold = failureThreshold;
this.timeout = timeout;
}
public CompletableFuture execute(Supplier<CompletableFuture> supplier) {
if (isOpen && System.currentTimeMillis() - lastFailureTime {
handleFailure();
throw throwable instanceof CompletionException ?
throwable : new CompletionException(throwable);
});
}
private void handleFailure() {
int currentFailures = failures.incrementAndGet();
lastFailureTime = System.currentTimeMillis();
if (currentFailures >= failureThreshold) {
isOpen = true;
// 设置自动恢复
CompletableFuture.delayedExecutor(timeout, TimeUnit.MILLISECONDS)
.execute(() -> {
isOpen = false;
failures.set(0);
});
}
}
}
}
六、最佳实践与注意事项
代码组织规范
- 合理划分异步任务边界,避免过度拆分
- 使用有意义的线程名称,便于问题排查
- 合理设置超时时间,防止资源泄露
- 及时关闭自定义线程池,释放资源
异常处理策略
public class ExceptionHandlingBestPractices {
/**
* 统一的异常处理模式
*/
public static CompletableFuture withProperExceptionHandling(
Supplier<CompletableFuture> task) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.get();
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}).thenCompose(future -> future)
.exceptionally(throwable -> {
// 记录日志
logError(throwable);
// 根据异常类型进行不同处理
return handleException(throwable);
});
}
private static T handleException(Throwable throwable) {
if (throwable instanceof BusinessException) {
// 业务异常处理
throw new CompletionException(throwable);
} else if (throwable instanceof TimeoutException) {
// 超时处理
throw new CompletionException("操作超时", throwable);
} else {
// 系统异常处理
throw new CompletionException("系统错误", throwable);
}
}
private static void logError(Throwable throwable) {
System.err.println("异步任务执行异常: " + throwable.getMessage());
// 实际项目中应该使用日志框架
}
}
监控与调试技巧
public class CompletableFutureMonitor {
/**
* 添加执行时间监控
*/
public static CompletableFuture withMonitoring(
String taskName, Supplier<CompletableFuture> task) {
long startTime = System.currentTimeMillis();
return task.get().whenComplete((result, throwable) -> {
long duration = System.currentTimeMillis() - startTime;
if (throwable != null) {
System.out.printf("任务 %s 执行失败,耗时: %dms,异常: %s%n",
taskName, duration, throwable.getMessage());
} else {
System.out.printf("任务 %s 执行成功,耗时: %dms%n", taskName, duration);
}
});
}
/**
* 线程池监控
*/
public static void monitorThreadPool(ExecutorService executor, String poolName) {
if (executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
System.out.printf("线程池 %s 状态: 活跃线程=%d, 队列大小=%d, 完成任务=%d%n",
poolName, tp.getActiveCount(), tp.getQueue().size(), tp.getCompletedTaskCount());
}, 0, 1, TimeUnit.SECONDS);
}
}
}