Java虚拟线程实战:高并发服务架构新范式
一、虚拟线程核心原理
Java 19引入的虚拟线程(Virtual Threads)是Loom项目的核心成果,通过轻量级用户态线程实现百万级并发:
// 基础虚拟线程创建
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 10_000).forEach(i -> {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
System.out.println(i);
return i;
});
});
} // 自动关闭executor
// 与传统线程对比
Thread.ofVirtual().name("virtual-thread-", 0)
.start(() -> System.out.println("Virtual Thread"));
Thread.ofPlatform().name("platform-thread-", 0)
.start(() -> System.out.println("Platform Thread"));
技术优势: 1:1000的线程内存占用比 | 毫秒级启动时间 | 自动负载均衡
二、高并发服务实战
1. HTTP服务优化
// 基于虚拟线程的HTTP服务器
public class VirtualThreadHttpServer {
public static void main(String[] args) throws IOException {
var server = HttpServer.create(
new InetSocketAddress(8080),
0 // 使用虚拟线程自动扩展
);
server.createContext("/api", exchange -> {
Thread.sleep(1000); // 模拟IO操作
var response = "Hello from virtual thread: "
+ Thread.currentThread();
exchange.sendResponseHeaders(200, response.length());
try (var os = exchange.getResponseBody()) {
os.write(response.getBytes());
}
});
server.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
server.start();
}
}
2. 数据库连接池优化
// 虚拟线程友好的连接池
public class VirtualThreadConnectionPool {
private final BlockingQueue<Connection> pool;
public VirtualThreadConnectionPool(int size, Supplier<Connection> factory) {
this.pool = new LinkedBlockingQueue(size);
IntStream.range(0, size)
.forEach(i -> pool.add(factory.get()));
}
public <T> T execute(Function<Connection, T> operation) {
try (var conn = pool.take()) {
return operation.apply(conn);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
pool.add(conn);
}
}
}
// 使用示例
var pool = new VirtualThreadConnectionPool(10, MySQLConnection::new);
var result = pool.execute(conn ->
conn.createStatement().executeQuery("SELECT * FROM users"));
三、电商系统秒杀案例
public class SeckillService {
private final ConcurrentHashMap<Long, AtomicInteger> inventory;
private final VirtualThreadConnectionPool dbPool;
public SeckillService() {
this.inventory = initInventory();
this.dbPool = new VirtualThreadConnectionPool(20, this::createConnection);
}
public CompletableFuture<Boolean> purchase(long userId, long itemId) {
return CompletableFuture.supplyAsync(() -> {
// 内存原子校验
var stock = inventory.get(itemId);
if (stock.decrementAndGet() {
try (var stmt = conn.prepareStatement(
"UPDATE items SET stock = stock - 1 WHERE id = ? AND stock > 0")) {
stmt.setLong(1, itemId);
return stmt.executeUpdate() > 0;
}
});
}, Executors.newVirtualThreadPerTaskExecutor());
}
// 模拟10万并发请求
public void simulateConcurrentRequests() {
var successCount = new AtomicInteger();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 100_000).forEach(i -> {
executor.submit(() -> {
if (purchase(i % 10_000, 1).join()) {
successCount.incrementAndGet();
}
});
});
}
System.out.println("秒杀成功数: " + successCount.get());
}
}
四、生产环境调优指南
- 线程池配置:
-Djdk.virtualThreadScheduler.parallelism=CPU核心数
- 监控指标:通过JMX监控虚拟线程创建/销毁速率
- 异常处理:为关键任务设置
Thread.setUncaughtExceptionHandler
- 调试技巧:使用
jcmd <pid> Thread.dump_to_file -format=json
- 兼容性:识别阻塞操作并用
jdk.tracePinnedThreads
标记