发布日期:2024年5月25日
一、现代Java并发技术全景
本教程将基于Java 21构建一个完整的电商秒杀系统,涵盖以下核心技术:
- 虚拟线程:Project Loom百万级并发连接
- 结构化并发:任务生命周期管理
- 响应式编程:Spring WebFlux集成
- 分布式事务:Seata实现最终一致性
- 实时监控:Micrometer+Prometheus
系统核心模块:
- 秒杀活动管理
- 库存预热与扣减
- 分布式限流熔断
- 订单最终一致性
二、环境准备与项目搭建
1. JDK 21环境配置
# 下载JDK21
wget https://download.java.net/java/GA/jdk21.0.2/f2283984656d49d69e91c558476027ac/13/GPL/openjdk-21.0.2_linux-x64_bin.tar.gz
# 解压并设置环境变量
tar -xzf openjdk-21.0.2_linux-x64_bin.tar.gz
export JAVA_HOME=/path/to/jdk-21
export PATH=$JAVA_HOME/bin:$PATH
2. Spring Boot项目初始化
// pom.xml核心依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies>
// 启用虚拟线程
@Configuration
public class ThreadConfig {
@Bean
public Executor virtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
}
三、虚拟线程深度应用
1. 百万连接HTTP服务
@RestController
public class FlashSaleController {
private static final Logger log = LoggerFactory.getLogger(FlashSaleController.class);
@GetMapping("/flashsale/{itemId}")
public Mono<ResponseEntity<String>> flashSale(
@PathVariable String itemId,
@RequestHeader("userId") String userId) {
return Mono.fromCallable(() -> {
// 虚拟线程内阻塞操作
boolean success = inventoryService.deduct(itemId, userId);
if (success) {
orderService.create(itemId, userId);
return ResponseEntity.ok("秒杀成功");
}
return ResponseEntity.status(429).body("库存不足");
}).subscribeOn(Schedulers.fromExecutor(virtualThreadExecutor));
}
}
2. 结构化并发任务管理
public CompletableFuture<Boolean> handleFlashSale(String itemId, String userId) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<Boolean> inventoryFuture = scope.fork(() ->
inventoryService.deduct(itemId, userId));
Future<Boolean> riskFuture = scope.fork(() ->
riskControlService.check(userId));
scope.join();
scope.throwIfFailed();
return CompletableFuture.completedFuture(
inventoryFuture.resultNow() && riskFuture.resultNow());
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
四、响应式编程集成
1. WebFlux高性能端点
@RestController
@RequestMapping("/api/v2/flashsale")
public class ReactiveFlashSaleController {
@Autowired
private ReactiveInventoryService inventoryService;
@Autowired
private ReactiveOrderService orderService;
@PostMapping("/{itemId}")
public Mono<ResponseEntity<String>> flashSale(
@PathVariable String itemId,
@RequestHeader String userId) {
return inventoryService.deduct(itemId, userId)
.flatMap(success -> {
if (success) {
return orderService.create(itemId, userId)
.thenReturn(ResponseEntity.ok("秒杀成功"));
}
return Mono.just(ResponseEntity.status(429).body("库存不足"));
})
.onErrorResume(e ->
Mono.just(ResponseEntity.status(500).body("系统繁忙")));
}
}
2. 背压控制与限流
@Service
public class ReactiveInventoryService {
private final Sinks.Many<InventoryEvent> inventorySink =
Sinks.many().multicast().onBackpressureBuffer(1000);
public Flux<InventoryEvent> getInventoryStream(String itemId) {
return inventorySink.asFlux()
.filter(event -> event.getItemId().equals(itemId))
.onBackpressureDrop(event ->
log.warn("丢弃库存事件: {}", event));
}
@RateLimiter(value = 1000, timeoutDuration = "PT1S")
public Mono<Boolean> deduct(String itemId, String userId) {
return Mono.fromCallable(() ->
inventoryRepository.deduct(itemId))
.subscribeOn(Schedulers.boundedElastic());
}
}
五、分布式事务实现
1. Seata分布式事务配置
# application.yml
seata:
enabled: true
application-id: flashsale-service
tx-service-group: my_seata_tx_group
service:
vgroup-mapping:
my_seata_tx_group: default
grouplist:
default: 127.0.0.1:8091
// 全局事务注解
@GlobalTransactional
public boolean processFlashSale(String itemId, String userId) {
boolean deducted = inventoryService.deduct(itemId);
if (deducted) {
return orderService.create(itemId, userId);
}
return false;
}
2. 最终一致性模式
@Service
public class InventoryService {
@Transactional
public boolean deductWithConfirm(String itemId) {
// 1. 预扣减库存
inventoryRepository.freeze(itemId, 1);
// 2. 发送可靠事件
transactionTemplate.execute(status -> {
eventPublisher.publishEvent(
new InventoryDeductEvent(itemId, 1));
return null;
});
return true;
}
@TransactionalEventListener(phase = AFTER_COMMIT)
public void confirmDeduct(InventoryDeductEvent event) {
// 3. 实际扣减库存
inventoryRepository.realDeduct(
event.getItemId(),
event.getAmount());
}
}
六、系统监控与调优
1. Micrometer指标监控
@Configuration
public class MetricsConfig {
@Bean
public MeterRegistryCustomizer<PrometheusMeterRegistry> metricsCommonTags() {
return registry -> registry.config().commonTags(
"application", "flashsale-service");
}
@Bean
public TimedAspect timedAspect(MeterRegistry registry) {
return new TimedAspect(registry);
}
}
// 业务指标监控
@RestController
public class MetricsController {
private final Counter requestCounter;
private final Timer processTimer;
public MetricsController(MeterRegistry registry) {
this.requestCounter = registry.counter("flashsale.requests");
this.processTimer = registry.timer("flashsale.process.time");
}
@GetMapping("/flashsale/{itemId}")
public String flashSale(...) {
requestCounter.increment();
return processTimer.record(() -> {
// 业务处理逻辑
return "秒杀成功";
});
}
}
2. 虚拟线程监控
@Scheduled(fixedRate = 5000)
public void monitorVirtualThreads() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
metrics.gauge("jvm.threads.virtual.count",
Thread.activeCount() - threadBean.getDaemonThreadCount());
metrics.gauge("jvm.threads.platform.count",
threadBean.getDaemonThreadCount());
metrics.gauge("jvm.threads.peak",
threadBean.getPeakThreadCount());
}
七、性能压测对比
1. JMH基准测试
@State(Scope.Benchmark)
@BenchmarkMode(Mode.Throughput)
public class VirtualThreadBenchmark {
private static final ExecutorService virtualThreadExecutor =
Executors.newVirtualThreadPerTaskExecutor();
private static final ExecutorService threadPoolExecutor =
Executors.newFixedThreadPool(200);
@Param({"100", "10000", "100000"})
private int requestCount;
@Benchmark
public void virtualThread() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
for (int i = 0; i {
Thread.sleep(10); // 模拟IO
return null;
});
}
scope.join();
}
}
@Benchmark
public void threadPool() throws Exception {
CountDownLatch latch = new CountDownLatch(requestCount);
for (int i = 0; i {
try {
Thread.sleep(10);
} finally {
latch.countDown();
}
});
}
latch.await();
}
}
2. 测试结果分析
并发模式 | 10K请求耗时 | 内存占用 | CPU利用率 |
---|---|---|---|
线程池(200) | 5.8s | 450MB | 65% |
虚拟线程 | 1.2s | 120MB | 85% |
八、总结与扩展
通过本教程,您已经掌握了:
- Java虚拟线程的高效应用
- 结构化并发的工程实践
- 响应式编程与背压控制
- 分布式事务解决方案
- 高并发系统监控方法
扩展学习方向:
- Project Panama本地代码调用
- Project Valhalla值类型
- GraalVM原生镜像编译
- Java并发模型持续演进