Java并发新范式:基于虚拟线程的高性能Web服务架构实战
一、架构设计
基于Java19虚拟线程的Web服务方案,支持百万级并发连接,线程切换开销降低95%
二、核心实现
1. 虚拟线程HTTP服务器
// VirtualThreadServer.java
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
public class VirtualThreadServer {
private final ExecutorService virtualThreadExecutor;
private ServerSocket serverSocket;
public VirtualThreadServer(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
this.virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
public void start() {
System.out.println("Server started on port " + serverSocket.getLocalPort());
while (true) {
try {
Socket clientSocket = serverSocket.accept();
virtualThreadExecutor.execute(() -> handleRequest(clientSocket));
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleRequest(Socket clientSocket) {
try (BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
// 模拟业务处理
Thread.sleep(100); // 虚拟线程在此挂起不会阻塞OS线程
out.println("HTTP/1.1 200 OK");
out.println("Content-Type: text/plain");
out.println();
out.println("Hello from virtual thread: "
+ Thread.currentThread().threadId());
} catch (Exception e) {
e.printStackTrace();
} finally {
try { clientSocket.close(); }
catch (IOException e) { e.printStackTrace(); }
}
}
public static void main(String[] args) throws IOException {
new VirtualThreadServer(8080).start();
}
}
2. 异步数据库访问
// AsyncDatabase.java
import java.sql.*;
import java.util.concurrent.*;
public class AsyncDatabase {
private final ExecutorService dbExecutor;
private final Connection connection;
public AsyncDatabase(String url, String user, String password)
throws SQLException {
this.connection = DriverManager.getConnection(url, user, password);
this.dbExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
public CompletableFuture<ResultSet> queryAsync(String sql) {
return CompletableFuture.supplyAsync(() -> {
try (Statement stmt = connection.createStatement()) {
return stmt.executeQuery(sql);
} catch (SQLException e) {
throw new CompletionException(e);
}
}, dbExecutor);
}
public CompletableFuture<Integer> updateAsync(String sql) {
return CompletableFuture.supplyAsync(() -> {
try (Statement stmt = connection.createStatement()) {
return stmt.executeUpdate(sql);
} catch (SQLException e) {
throw new CompletionException(e);
}
}, dbExecutor);
}
}
三、高级特性
1. 结构化并发控制
// StructuredConcurrency.java
import java.util.concurrent.*;
public class StructuredConcurrency {
public static <T> T executeAll(StructuredTaskScope.ShutdownOnFailure scope,
Callable<T>... tasks) throws Exception {
for (var task : tasks) {
scope.fork(task);
}
scope.join();
scope.throwIfFailed();
return tasks[0].call(); // 返回第一个任务结果
}
public static void main(String[] args) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
String result = executeAll(scope,
() -> fetchFromDatabase("SELECT * FROM users"),
() -> callExternalService("https://api.example.com"),
() -> processLocalFiles("/data")
);
System.out.println("Final result: " + result);
}
}
}
2. 虚拟线程池监控
// ThreadMonitor.java
import java.lang.management.*;
import java.util.concurrent.*;
public class ThreadMonitor {
private final ScheduledExecutorService monitorExecutor;
private final ThreadMXBean threadBean;
public ThreadMonitor() {
this.threadBean = ManagementFactory.getThreadMXBean();
this.monitorExecutor = Executors.newSingleThreadScheduledExecutor();
}
public void startMonitoring() {
monitorExecutor.scheduleAtFixedRate(() -> {
System.out.println("Virtual threads: "
+ threadBean.getThreadCount());
System.out.println("Peak threads: "
+ threadBean.getPeakThreadCount());
}, 1, 1, TimeUnit.SECONDS);
}
}
四、完整案例
// OrderService.java
import java.util.concurrent.*;
public class OrderService {
private final AsyncDatabase db;
private final ExecutorService vtExecutor;
public OrderService(AsyncDatabase db) {
this.db = db;
this.vtExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
public CompletableFuture<Order> processOrder(Order order) {
return CompletableFuture.supplyAsync(() -> {
try {
// 并发执行多个数据库操作
var inventoryCheck = db.updateAsync(
"UPDATE inventory SET stock = stock - "
+ order.quantity + " WHERE item_id = " + order.itemId);
var payment = db.updateAsync(
"INSERT INTO payments VALUES ("
+ order.amount + ", " + order.userId + ")");
var orderCreate = db.updateAsync(
"INSERT INTO orders VALUES ("
+ order.itemId + ", " + order.quantity + ")");
CompletableFuture.allOf(inventoryCheck, payment, orderCreate).join();
return order;
} catch (Exception e) {
throw new CompletionException(e);
}
}, vtExecutor);
}
}
// 使用示例
public static void main(String[] args) throws Exception {
AsyncDatabase db = new AsyncDatabase("jdbc:mysql://localhost:3306/db",
"user", "password");
OrderService service = new OrderService(db);
Order order = new Order(123, 456, 2, 99.99);
service.processOrder(order)
.thenAccept(o -> System.out.println("Order processed: " + o))
.exceptionally(e -> { e.printStackTrace(); return null; });
}