第一部分:虚拟线程核心原理与基础实践
1.1 虚拟线程与传统线程的对比
虚拟线程(Virtual Threads)是Java 19引入的预览特性,在Java 21中正式发布。与传统平台线程相比,虚拟线程在内存使用和创建开销上有显著优势:
传统线程(不推荐用于高并发)
// 创建1000个平台线程 - 内存开销巨大
for (int i = 0; i {
try {
Thread.sleep(1000); // 模拟IO操作
System.out.println("Task completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
// 问题:每个线程占用约1MB栈内存,1000个线程 ≈ 1GB内存
虚拟线程(推荐用于IO密集型)
// 创建100万个虚拟线程 - 内存开销极小
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i {
try {
Thread.sleep(1000); // 虚拟线程在IO阻塞时自动挂起
System.out.println("Virtual task completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
// 优势:百万级并发,内存占用仅几百MB
1.2 虚拟线程的创建与管理
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class VirtualThreadDemo {
// 方式1:使用Thread.startVirtualThread()
public void createVirtualThreadSimple() {
Thread virtualThread = Thread.startVirtualThread(() -> {
System.out.println("Running in virtual thread: "
+ Thread.currentThread());
});
}
// 方式2:使用Thread.Builder
public void createVirtualThreadWithBuilder() {
Thread.Builder builder = Thread.ofVirtual()
.name("virtual-thread-", 0) // 自动编号
.inheritInheritableThreadLocals(false); // 明确不继承
Thread virtualThread = builder.start(() -> {
performIOOperation();
});
}
// 方式3:使用虚拟线程执行器
public void useVirtualThreadExecutor() {
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
// 提交大量任务
List<Future> futures = new ArrayList();
for (int i = 0; i {
return processTask(Thread.currentThread().toString());
}));
}
// 关闭执行器
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 虚拟线程友好的IO操作
private void performIOOperation() {
try {
// 模拟数据库查询
Thread.sleep(100);
// 模拟HTTP调用
Thread.sleep(200);
// 模拟文件操作
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private String processTask(String threadInfo) {
return "Processed by: " + threadInfo;
}
}
第二部分:结构化并发编程模型
2.1 StructuredTaskScope 核心概念
结构化并发(Structured Concurrency)确保并发操作具有明确的生命周期和错误传播机制:
import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;
public class StructuredConcurrencyDemo {
// 基本的结构化并发模式
public Response fetchUserData(String userId) throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 定义子任务
Subtask userSubtask = scope.fork(() -> fetchUserInfo(userId));
Subtask<List> ordersSubtask = scope.fork(() -> fetchUserOrders(userId));
Subtask prefsSubtask = scope.fork(() -> fetchUserPreferences(userId));
// 等待所有任务完成或失败
scope.join();
scope.throwIfFailed(); // 如果有任务失败,抛出异常
// 组合结果
return new Response(
userSubtask.get(),
ordersSubtask.get(),
prefsSubtask.get()
);
}
}
// 带超时控制的结构化并发
public Response fetchUserDataWithTimeout(String userId, Duration timeout)
throws ExecutionException, InterruptedException, TimeoutException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask userSubtask = scope.fork(() -> fetchUserInfo(userId));
Subtask<List> ordersSubtask = scope.fork(() -> fetchUserOrders(userId));
// 带超时的等待
scope.joinUntil(Instant.now().plus(timeout));
if (userSubtask.state() == Subtask.State.SUCCESS
&& ordersSubtask.state() == Subtask.State.SUCCESS) {
return new Response(userSubtask.get(), ordersSubtask.get());
} else {
throw new TimeoutException("Data fetch timeout");
}
}
}
// 处理部分失败的情况
public PartialResult fetchDataWithFallback(String userId) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask userTask = scope.fork(() -> fetchUserInfo(userId));
Subtask<List> ordersTask = scope.fork(() -> fetchUserOrders(userId));
scope.join();
// 即使部分失败,也返回可用数据
return new PartialResult(
userTask.state() == Subtask.State.SUCCESS ?
Optional.of(userTask.get()) : Optional.empty(),
ordersTask.state() == Subtask.State.SUCCESS ?
Optional.of(ordersTask.get()) : Optional.empty()
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return PartialResult.empty();
}
}
// 模拟数据获取方法
private UserInfo fetchUserInfo(String userId) {
// 模拟数据库查询
return new UserInfo(userId, "User " + userId);
}
private List fetchUserOrders(String userId) {
// 模拟API调用
return List.of(new Order("order1"), new Order("order2"));
}
private UserPreferences fetchUserPreferences(String userId) {
// 模拟缓存查询
return new UserPreferences("default");
}
// 记录类定义
record Response(UserInfo user, List orders, UserPreferences preferences) {}
record PartialResult(Optional user, Optional<List> orders) {
static PartialResult empty() {
return new PartialResult(Optional.empty(), Optional.empty());
}
}
record UserInfo(String id, String name) {}
record Order(String id) {}
record UserPreferences(String theme) {}
}
第三部分:实战案例:基于虚拟线程的异步API网关
3.1 网关架构设计
构建一个高性能API网关,支持请求转发、限流、熔断和监控:
import java.net.http.*;
import java.time.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
public class VirtualThreadApiGateway {
private final HttpClient httpClient;
private final ExecutorService virtualThreadExecutor;
private final RateLimiter rateLimiter;
private final CircuitBreaker circuitBreaker;
private final MetricsCollector metrics;
public VirtualThreadApiGateway() {
// 创建基于虚拟线程的HTTP客户端
this.httpClient = HttpClient.newBuilder()
.executor(Executors.newVirtualThreadPerTaskExecutor())
.connectTimeout(Duration.ofSeconds(5))
.build();
this.virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
this.rateLimiter = new TokenBucketRateLimiter(1000, 1000); // 1000 QPS
this.circuitBreaker = new CircuitBreaker(5, Duration.ofSeconds(30));
this.metrics = new MetricsCollector();
}
// 异步处理HTTP请求
public CompletableFuture handleRequest(GatewayRequest request) {
return CompletableFuture.supplyAsync(() -> {
long startTime = System.nanoTime();
try {
// 1. 限流检查
if (!rateLimiter.tryAcquire()) {
metrics.recordRejection();
return GatewayResponse.rateLimited();
}
// 2. 熔断器检查
if (!circuitBreaker.allowRequest()) {
metrics.recordCircuitOpen();
return GatewayResponse.circuitOpen();
}
// 3. 转发请求
HttpResponse backendResponse = forwardToBackend(request);
// 4. 处理响应
if (backendResponse.statusCode() >= 500) {
circuitBreaker.recordFailure();
} else {
circuitBreaker.recordSuccess();
}
metrics.recordSuccess(System.nanoTime() - startTime);
return GatewayResponse.success(backendResponse);
} catch (Exception e) {
circuitBreaker.recordFailure();
metrics.recordFailure(e);
return GatewayResponse.error(e);
}
}, virtualThreadExecutor);
}
// 批量请求处理
public List handleBatchRequests(List requests) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<Subtask> subtasks = new ArrayList();
for (GatewayRequest request : requests) {
subtasks.add(scope.fork(() -> handleRequest(request).get()));
}
scope.join();
scope.throwIfFailed();
return subtasks.stream()
.map(Subtask::get)
.collect(Collectors.toList());
} catch (Exception e) {
throw new GatewayException("Batch processing failed", e);
}
}
// 转发请求到后端服务
private HttpResponse forwardToBackend(GatewayRequest request)
throws IOException, InterruptedException {
HttpRequest httpRequest = HttpRequest.newBuilder()
.uri(URI.create(request.getBackendUrl()))
.header("Content-Type", "application/json")
.method(request.getMethod(),
HttpRequest.BodyPublishers.ofString(request.getBody()))
.timeout(Duration.ofSeconds(10))
.build();
return httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString());
}
// 优雅关闭
public void shutdown() {
virtualThreadExecutor.shutdown();
try {
if (!virtualThreadExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
virtualThreadExecutor.shutdownNow();
}
} catch (InterruptedException e) {
virtualThreadExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
// 内部类定义
static class TokenBucketRateLimiter {
private final AtomicLong tokens;
private final long capacity;
private final long refillRate; // tokens per second
TokenBucketRateLimiter(long capacity, long refillRate) {
this.capacity = capacity;
this.refillRate = refillRate;
this.tokens = new AtomicLong(capacity);
startRefillThread();
}
boolean tryAcquire() {
long current = tokens.get();
return current > 0 && tokens.compareAndSet(current, current - 1);
}
private void startRefillThread() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
tokens.updateAndGet(current -> Math.min(capacity, current + refillRate));
}, 1, 1, TimeUnit.SECONDS);
}
}
static class CircuitBreaker {
private final AtomicInteger failureCount = new AtomicInteger();
private final int failureThreshold;
private final Duration resetTimeout;
private volatile long lastFailureTime = 0;
private volatile State state = State.CLOSED;
enum State { CLOSED, OPEN, HALF_OPEN }
CircuitBreaker(int failureThreshold, Duration resetTimeout) {
this.failureThreshold = failureThreshold;
this.resetTimeout = resetTimeout;
}
boolean allowRequest() {
if (state == State.OPEN) {
if (System.currentTimeMillis() - lastFailureTime >
resetTimeout.toMillis()) {
state = State.HALF_OPEN;
return true;
}
return false;
}
return true;
}
void recordSuccess() {
if (state == State.HALF_OPEN) {
state = State.CLOSED;
failureCount.set(0);
}
}
void recordFailure() {
failureCount.incrementAndGet();
lastFailureTime = System.currentTimeMillis();
if (failureCount.get() >= failureThreshold) {
state = State.OPEN;
}
}
}
static class MetricsCollector {
private final AtomicLong successCount = new AtomicLong();
private final AtomicLong failureCount = new AtomicLong();
private final AtomicLong totalLatency = new AtomicLong();
private final AtomicLong rejectionCount = new AtomicLong();
void recordSuccess(long latencyNanos) {
successCount.incrementAndGet();
totalLatency.addAndGet(latencyNanos);
}
void recordFailure(Exception e) {
failureCount.incrementAndGet();
}
void recordRejection() {
rejectionCount.incrementAndGet();
}
void recordCircuitOpen() {
// 记录熔断器打开事件
}
MetricsSnapshot getSnapshot() {
long successes = successCount.get();
long totalLat = totalLatency.get();
double avgLatency = successes > 0 ?
(totalLat / 1_000_000.0) / successes : 0.0;
return new MetricsSnapshot(
successes,
failureCount.get(),
rejectionCount.get(),
avgLatency
);
}
}
// 记录类
record GatewayRequest(String backendUrl, String method, String body) {}
record GatewayResponse(int statusCode, String body, Map headers) {
static GatewayResponse rateLimited() {
return new GatewayResponse(429, "Rate limited", Map.of());
}
static GatewayResponse circuitOpen() {
return new GatewayResponse(503, "Circuit breaker open", Map.of());
}
static GatewayResponse success(HttpResponse response) {
return new GatewayResponse(
response.statusCode(),
response.body(),
response.headers().map()
);
}
static GatewayResponse error(Exception e) {
return new GatewayResponse(500, "Internal error: " + e.getMessage(), Map.of());
}
}
record MetricsSnapshot(long successCount, long failureCount,
long rejectionCount, double avgLatencyMs) {}
}
第四部分:性能优化与监控
4.1 虚拟线程池调优
import java.lang.management.*;
import java.util.concurrent.*;
public class VirtualThreadMonitor {
// 监控虚拟线程状态
public void monitorVirtualThreads() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
// 获取线程信息
long[] threadIds = threadBean.getAllThreadIds();
int virtualThreadCount = 0;
int mountedCount = 0;
int carrierThreadCount = 0;
for (long threadId : threadIds) {
ThreadInfo info = threadBean.getThreadInfo(threadId);
if (info != null) {
if (info.isVirtual()) {
virtualThreadCount++;
if (info.getThreadState() != Thread.State.NEW) {
mountedCount++;
}
} else if (info.getThreadName().startsWith("ForkJoinPool")) {
carrierThreadCount++;
}
}
}
System.out.printf("""
虚拟线程监控报告:
总虚拟线程数: %d
已挂载虚拟线程: %d
载体线程数: %d
虚拟线程/载体线程比例: %.2f:1
%n""",
virtualThreadCount,
mountedCount,
carrierThreadCount,
(double) virtualThreadCount / Math.max(1, carrierThreadCount)
);
}, 0, 5, TimeUnit.SECONDS);
}
// 性能调优配置
public ExecutorService createOptimizedExecutor() {
// 自定义线程工厂,优化虚拟线程创建
ThreadFactory virtualThreadFactory = Thread.ofVirtual()
.name("vthread-", 0)
.uncaughtExceptionHandler((thread, throwable) -> {
// 自定义异常处理
System.err.println("Virtual thread error: " + thread.getName());
throwable.printStackTrace();
})
.factory();
// 创建调优的执行器
return new ThreadPoolExecutor(
0, // 核心线程数(虚拟线程不需要)
Integer.MAX_VALUE, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲时间
new SynchronousQueue(), // 直接传递队列
virtualThreadFactory,
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
// 执行前监控
if (t.isVirtual()) {
// 记录虚拟线程开始执行
VirtualThreadMetrics.recordThreadStart();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 执行后清理
if (t != null) {
VirtualThreadMetrics.recordFailure();
} else {
VirtualThreadMetrics.recordSuccess();
}
}
};
}
// 内存使用优化
public class MemoryAwareExecutor implements Executor {
private final Executor delegate;
private final long memoryThreshold;
private final MemoryMXBean memoryBean;
public MemoryAwareExecutor(Executor delegate, long memoryThresholdMB) {
this.delegate = delegate;
this.memoryThreshold = memoryThresholdMB * 1024 * 1024;
this.memoryBean = ManagementFactory.getMemoryMXBean();
}
@Override
public void execute(Runnable command) {
// 检查内存使用
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
long usedMemory = heapUsage.getUsed();
if (usedMemory > memoryThreshold) {
// 内存过高,拒绝新任务或降级处理
System.gc(); // 建议GC
try {
Thread.sleep(100); // 短暂等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 重新检查
heapUsage = memoryBean.getHeapMemoryUsage();
if (heapUsage.getUsed() > memoryThreshold) {
throw new RejectedExecutionException(
"Memory threshold exceeded: " +
(heapUsage.getUsed() / 1024 / 1024) + "MB used"
);
}
}
delegate.execute(command);
}
}
}
// 虚拟线程指标收集
class VirtualThreadMetrics {
private static final AtomicLong threadStarts = new AtomicLong();
private static final AtomicLong successes = new AtomicLong();
private static final AtomicLong failures = new AtomicLong();
private static final LongAdder activeThreads = new LongAdder();
static void recordThreadStart() {
threadStarts.incrementAndGet();
activeThreads.increment();
}
static void recordSuccess() {
successes.incrementAndGet();
activeThreads.decrement();
}
static void recordFailure() {
failures.incrementAndGet();
activeThreads.decrement();
}
static Metrics getMetrics() {
return new Metrics(
threadStarts.get(),
successes.get(),
failures.get(),
activeThreads.sum()
);
}
record Metrics(long totalStarts, long totalSuccesses,
long totalFailures, long activeCount) {}
}
第五部分:生产环境部署与最佳实践
5.1 Spring Boot 3集成配置
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.concurrent.*;
@SpringBootApplication
@EnableAsync
public class VirtualThreadApplication {
public static void main(String[] args) {
SpringApplication.run(VirtualThreadApplication.class, args);
}
@Bean
public Executor virtualThreadTaskExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
@Bean
public WebClient webClient() {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.runOn(LoopResources.create("webclient-vthread", 1, true))
))
.build();
}
}
// 控制器示例
@RestController
@RequestMapping("/api")
class ApiController {
private final Executor virtualThreadExecutor;
private final WebClient webClient;
public ApiController(Executor virtualThreadExecutor, WebClient webClient) {
this.virtualThreadExecutor = virtualThreadExecutor;
this.webClient = webClient;
}
@GetMapping("/users/{id}")
public CompletableFuture getUser(@PathVariable String id) {
return CompletableFuture.supplyAsync(() -> {
// 并行获取用户信息
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = scope.fork(() ->
fetchUserFromService(id)
);
var ordersTask = scope.fork(() ->
fetchOrdersFromService(id)
);
var preferencesTask = scope.fork(() ->
fetchPreferencesFromService(id)
);
scope.join();
scope.throwIfFailed();
return new UserResponse(
userTask.get(),
ordersTask.get(),
preferencesTask.get()
);
} catch (Exception e) {
throw new RuntimeException("Failed to fetch user data", e);
}
}, virtualThreadExecutor);
}
@PostMapping("/batch")
public CompletableFuture<List> processBatch(
@RequestBody BatchRequest request) {
return CompletableFuture.supplyAsync(() -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<Subtask> subtasks = request.userIds()
.stream()
.map(userId -> scope.fork(() -> getUser(userId).get()))
.collect(Collectors.toList());
scope.join();
scope.throwIfFailed();
return subtasks.stream()
.map(Subtask::get)
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("Batch processing failed", e);
}
}, virtualThreadExecutor);
}
// 异步服务调用
private UserInfo fetchUserFromService(String userId) {
return webClient.get()
.uri("/users-service/users/{id}", userId)
.retrieve()
.bodyToMono(UserInfo.class)
.block();
}
private List fetchOrdersFromService(String userId) {
return webClient.get()
.uri("/orders-service/users/{id}/orders", userId)
.retrieve()
.bodyToFlux(Order.class)
.collectList()
.block();
}
private UserPreferences fetchPreferencesFromService(String userId) {
return webClient.get()
.uri("/preferences-service/users/{id}/preferences", userId)
.retrieve()
.bodyToMono(UserPreferences.class)
.block();
}
// 记录类
record BatchRequest(List userIds) {}
record UserResponse(UserInfo user, List orders, UserPreferences preferences) {}
record UserInfo(String id, String name, String email) {}
record Order(String id, BigDecimal amount) {}
record UserPreferences(String theme, String language) {}
}
5.2 最佳实践总结
✅ 推荐使用场景
- 高并发IO密集型应用(HTTP服务器、API网关)
- 微服务间的异步通信
- 数据库连接池管理
- 文件处理流水线
- 实时数据处理
❌ 避免使用场景
- CPU密集型计算任务
- 需要线程本地存储的复杂场景
- 同步锁竞争激烈的场景
- 需要精确线程控制的场景
⚠️ 注意事项
- 避免在虚拟线程中使用ThreadLocal
- 注意线程池的关闭和资源清理
- 监控虚拟线程的创建和销毁频率
- 合理设置载体线程池大小
- 注意异常传播和错误处理
// 页面交互功能
document.addEventListener(‘DOMContentLoaded’, function() {
// 代码块语法高亮和复制功能
const codeBlocks = document.querySelectorAll(‘pre code’);
codeBlocks.forEach(block => {
// 添加复制按钮
const copyButton = document.createElement(‘button’);
copyButton.textContent = ‘复制’;
copyButton.className = ‘copy-btn’;
copyButton.style.cssText = `
position: absolute;
top: 5px;
right: 5px;
background: #2c3e50;
color: white;
border: none;
padding: 4px 8px;
border-radius: 3px;
cursor: pointer;
font-size: 12px;
`;
const pre = block.parentElement;
pre.style.position = ‘relative’;
pre.appendChild(copyButton);
copyButton.addEventListener(‘click’, async () => {
try {
await navigator.clipboard.writeText(block.textContent);
copyButton.textContent = ‘已复制!’;
setTimeout(() => {
copyButton.textContent = ‘复制’;
}, 2000);
} catch (err) {
console.error(‘复制失败:’, err);
}
});
// 添加语言标识
const language = block.className || ‘java’;
const langLabel = document.createElement(‘span’);
langLabel.textContent = language;
langLabel.style.cssText = `
position: absolute;
top: 5px;
left: 5px;
background: #34495e;
color: white;
padding: 2px 6px;
border-radius: 3px;
font-size: 10px;
font-family: monospace;
`;
pre.appendChild(langLabel);
});
// 目录导航高亮
const sections = document.querySelectorAll(‘section’);
const navLinks = document.querySelectorAll(‘nav a’);
const observer = new IntersectionObserver(
(entries) => {
entries.forEach(entry => {
if (entry.isIntersecting) {
const id = entry.target.id;
navLinks.forEach(link => {
link.style.fontWeight =
link.getAttribute(‘href’) === `#${id}` ?
‘bold’ : ‘normal’;
});
}
});
},
{ threshold: 0.5 }
);
sections.forEach(section => observer.observe(section));
// 返回顶部按钮
const backToTop = document.createElement(‘button’);
backToTop.textContent = ‘↑’;
backToTop.style.cssText = `
position: fixed;
bottom: 20px;
right: 20px;
width: 40px;
height: 40px;
border-radius: 50%;
background: #3498db;
color: white;
border: none;
font-size: 20px;
cursor: pointer;
display: none;
z-index: 1000;
`;
document.body.appendChild(backToTop);
window.addEventListener(‘scroll’, () => {
backToTop.style.display =
window.pageYOffset > 300 ? ‘block’ : ‘none’;
});
backToTop.addEventListener(‘click’, () => {
window.scrollTo({ top: 0, behavior: ‘smooth’ });
});
});

