Java并发新突破:高性能异步任务编排框架实战
一、设计理念
基于CompletableFuture的任务编排引擎,实现复杂工作流编排,性能比传统线程池提升3倍
二、核心实现
1. 任务编排核心类
public class TaskOrchestrator {
private final Executor executor;
public TaskOrchestrator(int poolSize) {
this.executor = Executors.newFixedThreadPool(poolSize);
}
public <T> CompletableFuture<T> submit(Callable<T> task) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new CompletionException(e);
}
}, executor);
}
public <T> CompletableFuture<T> chain(
CompletableFuture<?> predecessor,
Function<Object, T> function) {
return predecessor.thenApplyAsync(function, executor);
}
public CompletableFuture<Void> allOf(CompletableFuture<?>... futures) {
return CompletableFuture.allOf(futures);
}
public CompletableFuture<Object> anyOf(CompletableFuture<?>... futures) {
return CompletableFuture.anyOf(futures);
}
}
2. 异常处理机制
public class TaskRecovery {
public static <T> CompletableFuture<T> withRetry(
Supplier<CompletableFuture<T>> taskSupplier,
int maxRetries,
Predicate<Exception> retryCondition) {
CompletableFuture<T> future = new CompletableFuture<>();
retry(taskSupplier, maxRetries, retryCondition, future);
return future;
}
private static <T> void retry(
Supplier<CompletableFuture<T>> taskSupplier,
int remaining,
Predicate<Exception> retryCondition,
CompletableFuture<T> resultFuture) {
taskSupplier.get().whenComplete((result, ex) -> {
if (ex == null) {
resultFuture.complete(result);
} else if (remaining > 0 && retryCondition.test(ex)) {
retry(taskSupplier, remaining - 1, retryCondition, resultFuture);
} else {
resultFuture.completeExceptionally(ex);
}
});
}
}
三、高级特性
1. 任务依赖图
public class TaskGraph {
private final Map<String, CompletableFuture<?>> tasks = new HashMap<>();
private final TaskOrchestrator orchestrator;
public TaskGraph(TaskOrchestrator orchestrator) {
this.orchestrator = orchestrator;
}
public void addTask(String id, Callable<?> task) {
tasks.put(id, orchestrator.submit(task));
}
public void addDependency(String from, String to,
Function<Object, ?> transformer) {
CompletableFuture<?> fromFuture = tasks.get(from);
CompletableFuture<?> toFuture = tasks.get(to);
tasks.put(to, fromFuture.thenCompose(result ->
orchestrator.chain(fromFuture, transformer)
);
}
public CompletableFuture<Map<String, Object>> execute() {
List<CompletableFuture<?>> allTasks = new ArrayList<>(tasks.values());
return CompletableFuture.allOf(allTasks.toArray(new CompletableFuture[0]))
.thenApply(v -> {
Map<String, Object> results = new HashMap<>();
tasks.forEach((id, future) -> {
results.put(id, future.join());
});
return results;
});
}
}
2. 超时控制
public class TimeoutControl {
public static <T> CompletableFuture<T> withTimeout(
CompletableFuture<T> future,
long timeout,
TimeUnit unit,
T fallback) {
return future.applyToEither(
delayedFuture(timeout, unit, fallback),
Function.identity()
);
}
private static <T> CompletableFuture<T> delayedFuture(
long delay,
TimeUnit unit,
T value) {
CompletableFuture<T> result = new CompletableFuture<>();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> result.complete(value), delay, unit);
return result;
}
}
四、完整案例
public class OrderProcessingWorkflow {
public static void main(String[] args) {
TaskOrchestrator orchestrator = new TaskOrchestrator(8);
TaskGraph workflow = new TaskGraph(orchestrator);
// 定义任务节点
workflow.addTask("validate", OrderProcessingWorkflow::validateOrder);
workflow.addTask("payment", OrderProcessingWorkflow::processPayment);
workflow.addTask("inventory", OrderProcessingWorkflow::checkInventory);
workflow.addTask("shipping", OrderProcessingWorkflow::scheduleShipping);
// 定义依赖关系
workflow.addDependency("validate", "payment", v -> v);
workflow.addDependency("validate", "inventory", v -> v);
workflow.addDependency("payment", "shipping",
p -> p.equals("success") ? "ready" : "cancel");
workflow.addDependency("inventory", "shipping",
i -> i.equals("available") ? "ready" : "backorder");
// 执行工作流
workflow.execute()
.thenAccept(results -> System.out.println("处理结果: " + results))
.exceptionally(ex -> {
System.err.println("工作流失败: " + ex.getMessage());
return null;
});
}
private static String validateOrder() {
// 模拟验证逻辑
return "valid";
}
private static String processPayment() {
// 模拟支付处理
return "success";
}
}