在微服务架构中,一个聚合接口常常需要并行调用多个下游服务,然后整合结果。传统做法依赖ExecutorService和Future,但这种方式容易产生线程泄露、错误处理复杂、任务间的关系模糊等问题。Java 21以预览特性引入的结构化并发(Structured Concurrency),通过StructuredTaskScope将一组并发任务的生命周期限定在明确的代码块内,实现了更安全、更清晰的并行编程范式。本文将基于一个完整的聚合查询案例,从原理到实战,彻底解锁这一未来并发标准。
一、传统并行编程的典型陷阱
考虑一个场景:获取用户订单详情,需要同时查询用户服务、订单服务和库存服务。典型的传统代码如下:
ExecutorService executor = Executors.newFixedThreadPool(3);
Future<User> userFuture = executor.submit(() -> userService.getUser(id));
Future<Order> orderFuture = executor.submit(() -> orderService.getOrder(id));
Future<Inventory> invFuture = executor.submit(() -> inventoryService.getInventory(id));
User user = userFuture.get();
Order order = orderFuture.get();
Inventory inv = invFuture.get();
executor.shutdown();
这段代码看似正常,但存在诸多隐患:
- 如果
userService.getUser(id)抛出异常,其他已提交的任务仍在运行,浪费资源。 - 忘记调用
shutdown()会导致线程泄露。 - 三个任务的完成结果彼此无关,但业务上它们是同一个聚合请求的一部分,如果任一失败,整个请求理应失败,而传统方式难以全局取消。
结构化并发正是为解决这些问题而生:它要求所有子任务必须在一个显式的作用域内声明和执行,作用域结束时确保所有子任务已完成或被取消。
二、StructuredTaskScope核心API
结构化并发的核心类是java.util.concurrent.StructuredTaskScope(Java 21中为预览API,使用时需添加--enable-preview)。它有两个预定义子类:
ShutdownOnSuccess:任何一个任务成功就返回其结果,并自动取消其余任务。ShutdownOnFailure:任一任务失败就取消所有任务,并抛出异常。
基类StructuredTaskScope实现了AutoCloseable,因此可以在try-with-resources中使用,确保所有线程被恰当管理。其典型使用模式:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> task1 = scope.fork(() -> service1.call());
Future<Integer> task2 = scope.fork(() -> service2.call());
scope.join(); // 等待所有任务完成(或被取消)
scope.throwIfFailed(); // 若有失败,则抛出异常
String result1 = task1.resultNow(); // 获取结果(此时已确保完成)
int result2 = task2.resultNow();
// 组合结果...
}
在try块结束时,作用域自动关闭,确保未完成的子任务被中断并等待终止。这从根本上避免了线程泄露。
三、实战:聚合订单详情API
我们实现一个Spring Boot风格的聚合服务,同时调用用户服务、订单服务和库存服务。为演示结构化并发的错误处理与自动取消,其中一个服务可能延迟或失败。
3.1 模拟下游服务
// 模拟服务接口
interface UserService {
User getUser(Long id) throws Exception;
}
interface OrderService {
Order getOrder(Long id) throws Exception;
}
interface InventoryService {
Inventory getInventory(Long orderId) throws Exception;
}
// 简单模拟实现
class MockUserService implements UserService {
public User getUser(Long id) throws Exception {
Thread.sleep(200); // 模拟延迟
return new User(id, "张伟");
}
}
class MockOrderService implements OrderService {
public Order getOrder(Long id) throws Exception {
// 模拟偶尔失败
if (Math.random() < 0.1) throw new RuntimeException("订单服务暂不可用");
Thread.sleep(150);
return new Order(id, "商品A", 2);
}
}
class MockInventoryService implements InventoryService {
public Inventory getInventory(Long orderId) throws Exception {
Thread.sleep(180);
return new Inventory(orderId, "充足");
}
}
3.2 使用StructuredTaskScope并行调用
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
public class OrderAggregator {
private final UserService userService = new MockUserService();
private final OrderService orderService = new MockOrderService();
private final InventoryService inventoryService = new MockInventoryService();
public OrderDetail aggregate(Long userId, Long orderId) throws Exception {
// 使用 ShutdownOnFailure 确保任一失败即取消全部
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<User> userFuture = scope.fork(() -> userService.getUser(userId));
Future<Order> orderFuture = scope.fork(() -> orderService.getOrder(orderId));
Future<Inventory> invFuture = scope.fork(() -> inventoryService.getInventory(orderId));
// 等待所有任务完成(或某个失败导致取消)
scope.join();
// 如果任何任务失败,这里会抛出异常(保留第一个异常)
scope.throwIfFailed();
// 所有任务成功,安全获取结果
return new OrderDetail(
userFuture.resultNow(),
orderFuture.resultNow(),
invFuture.resultNow()
);
}
}
}
3.3 使用ShutdownOnSuccess优化(快速响应)
如果存在冗余服务(多个副本),可以配置为只要一个成功就立即返回,并取消其他请求:
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<User>()) {
scope.fork(() -> userServiceA.getUser(id));
scope.fork(() -> userServiceB.getUser(id));
scope.join();
User user = scope.result(); // 获取第一个成功的结果
// 其他请求被自动取消
}
这非常适合实现快速回退或竞争请求模式。
四、结合虚拟线程:极致轻量的并行
结构化并发与虚拟线程天然契合。StructuredTaskScope默认使用虚拟线程执行子任务(如果未指定线程工厂),因此即使并发数千个子任务也不会耗尽平台线程。例如,批量查询大量商品详情:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<Future<ProductDetail>> futures = productIds.stream()
.map(id -> scope.fork(() -> fetchProductDetail(id)))
.toList();
scope.join();
scope.throwIfFailed();
List<ProductDetail> details = futures.stream()
.map(Future::resultNow)
.toList();
}
此代码在数千个并发的场景下仍能保持极低资源消耗,因为每个子任务都跑在轻量的虚拟线程上。结构化并发的强制生命周期保证了所有虚拟线程在退出try块前被正确回收。
五、自定义作用域策略
除了ShutdownOnSuccess和ShutdownOnFailure,还可以通过继承StructuredTaskScope实现自定义策略。例如,收集所有子任务的结果(包括失败),然后自己决定如何处理:
class CollectAllScope<T> extends StructuredTaskScope<T> {
private final List<Future<T>> futures = new ArrayList<>();
@Override
protected void handleComplete(Future<T> future) {
futures.add(future);
}
public List<Future<T>> allFutures() {
return futures;
}
}
使用时:
try (var scope = new CollectAllScope<String>()) {
scope.fork(() -> service1.call());
scope.fork(() -> service2.call());
scope.join(); // 等待所有完成
for (Future<String> f : scope.allFutures()) {
if (f.state() == Future.State.SUCCESS) {
System.out.println("成功: " + f.resultNow());
} else {
System.out.println("失败: " + f.exceptionNow().getMessage());
}
}
}
六、与传统方式对比总结
| 特性 | ExecutorService + Future | StructuredTaskScope |
|---|---|---|
| 生命周期管理 | 手动shutdown,易泄露 | try-with-resources自动管理 |
| 错误传播 | 需逐个检查Future,其他任务仍运行 | 一个失败自动取消所有,throwIfFailed集中处理 |
| 线程使用 | 平台线程或池化 | 默认虚拟线程,极轻量 |
| 代码可读性 | 分散的try-catch和线程池关闭 | 代码块清晰,父子任务关系直观 |
七、启用预览特性与未来展望
在Java 21中使用结构化并发需编译和运行时添加--enable-preview。例如:
javac --release 21 --enable-preview Main.java
java --enable-preview Main
结构化并发在Java 21中是孵化特性,但在Java 22中已进入第二轮预览,预计在Java 23或24中正式定案。随着虚拟线程的普及,结构化并发必将成为主流并行编程模型,值得每一位Java开发者提前掌握。
八、总结
通过聚合订单的实战案例,我们看到结构化并发将“一组并发任务”作为一个整体来管理,显著提升了代码的健壮性和可读性。它消除了手动管理线程池生命周期带来的资源泄露风险,并通过自动取消机制实现了快速失败。结合虚拟线程,我们可以以极小的开销并行处理大量请求,为微服务聚合、批处理操作等场景带来了全新的可能性。现在,就在你的Java 21+项目中启用预览,体验结构化并发的强大吧。

