Java并发编程新范式:结构化并发实战与复杂任务编排
一、结构化并发核心理念
Java 19引入的并发编程新模型:
// 基础结构化并发示例
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future userFuture = scope.fork(() -> fetchUser(userId));
Future orderFuture = scope.fork(() -> fetchOrderCount(userId));
scope.join(); // 等待所有子任务
scope.throwIfFailed(); // 检查异常
return new UserProfile(
userFuture.resultNow(),
orderFuture.resultNow()
);
}
// 与传统并发对比
void traditionalConcurrency() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
Future userFuture = executor.submit(() -> fetchUser(userId));
Future orderFuture = executor.submit(() -> fetchOrderCount(userId));
try {
String user = userFuture.get();
Integer orders = orderFuture.get();
return new UserProfile(user, orders);
} finally {
executor.shutdown();
}
}
核心优势:生命周期管理、错误传播、可观测性、代码简洁
二、高级任务编排模式
1. 复杂依赖关系处理
// 多阶段任务编排
UserProfile loadUserProfile(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 第一阶段:获取基础数据
Future userFuture = scope.fork(() -> userService.get(userId));
Future<List> ordersFuture = scope.fork(() -> orderService.list(userId));
scope.join();
scope.throwIfFailed();
User user = userFuture.resultNow();
List orders = ordersFuture.resultNow();
// 第二阶段:并行处理关联数据
try (var innerScope = new StructuredTaskScope.ShutdownOnFailure()) {
Future addressFuture = innerScope.fork(() ->
addressService.getPrimary(user.addressIds()));
Future paymentFuture = innerScope.fork(() ->
paymentService.getDefault(userId));
Future<List> recFuture = innerScope.fork(() ->
recommendationService.getForUser(userId, 5));
innerScope.join();
innerScope.throwIfFailed();
return new UserProfile(
user,
orders,
addressFuture.resultNow(),
paymentFuture.resultNow(),
recFuture.resultNow()
);
}
}
}
2. 超时与取消控制
// 带超时的结构化并发
T executeWithTimeout(Callable task, Duration timeout) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future future = scope.fork(task);
Future timer = scope.fork(() -> {
Thread.sleep(timeout.toMillis());
throw new TimeoutException("操作超时");
});
scope.join();
return future.resultNow();
}
}
// 响应取消请求
void processOrder(Order order, CancellationToken token) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnCancellation()) {
scope.fork(() -> {
while (!token.isCancelled()) {
// 处理订单逻辑
Thread.sleep(100);
}
throw new CancellationException("任务被取消");
});
scope.join();
} catch (ExecutionException e) {
if (e.getCause() instanceof CancellationException) {
System.out.println("正常处理取消逻辑");
} else {
throw e;
}
}
}
三、虚拟线程集成方案
1. 大规模并发处理
// 虚拟线程池配置
ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(
Thread.ofVirtual()
.name("worker-", 0)
.factory());
// 批量任务处理
List batchProcess(List requests) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<Future> futures = requests.stream()
.map(req -> scope.fork(() -> processSingle(req)))
.toList();
scope.join();
return futures.stream()
.map(Future::resultNow)
.toList();
}
}
// 资源清理保证
void safeResourceOperation() {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future resourceFuture = scope.fork(this::acquireResource);
scope.join();
try (Resource resource = resourceFuture.resultNow()) {
// 使用资源
resource.process();
}
}
}
2. 与CompletableFuture互操作
// 结构化并发转CompletableFuture
CompletableFuture structuredToFuture(Callable task) {
var future = new CompletableFuture();
Thread.ofVirtual().start(() -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future result = scope.fork(task);
scope.join();
future.complete(result.resultNow());
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
// CompletableFuture转结构化并发
T futureToStructured(CompletableFuture future) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future adapted = scope.fork(() -> {
try {
return future.get();
} catch (InterruptedException e) {
throw new CancellationException();
}
});
scope.join();
return adapted.resultNow();
}
}
四、电商订单系统实战
1. 分布式事务协调
// 订单创建分布式事务
OrderResult createOrder(OrderRequest request) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并行验证
Future inventoryCheck = scope.fork(() ->
inventoryService.reserve(request.items()));
Future paymentAuth = scope.fork(() ->
paymentService.authorize(request.paymentMethod(), request.amount()));
scope.join();
scope.throwIfFailed();
// 串行执行
try (var innerScope = new StructuredTaskScope.ShutdownOnFailure()) {
Future orderFuture = innerScope.fork(() ->
orderRepository.create(request, inventoryCheck.resultNow()));
Future paymentFuture = innerScope.fork(() ->
paymentService.capture(paymentAuth.resultNow().authId()));
innerScope.join();
innerScope.throwIfFailed();
// 异步通知
Thread.ofVirtual().start(() -> {
notificationService.sendOrderCreated(
orderFuture.resultNow().orderId());
});
return new OrderResult(
orderFuture.resultNow(),
paymentFuture.resultNow()
);
}
}
}
五、生产环境最佳实践
- 错误处理:合理区分业务异常与系统异常
- 监控指标:跟踪任务执行时间和成功率
- 资源限制:控制最大并发子任务数量
- 线程本地:使用ScopedValue替代ThreadLocal
- 调试支持:利用JDK Flight Recorder分析任务流