Java 21 虚拟线程深度实战:从平台线程到千万级并发的完整迁移指南

2026-06-05 0 904

在 Java 21 正式发布后,虚拟线程(Virtual Threads) 成为最受瞩目的特性之一。作为 Project Loom 的核心成果,虚拟线程从根本上改变了 Java 的并发编程模型,使得开发者可以用熟悉的同步阻塞风格编写代码,却能获得接近异步框架的吞吐量。本文将深入解析虚拟线程的工作原理,通过多个完整的实战案例展示如何将传统的平台线程应用迁移到虚拟线程上,并解决迁移过程中遇到的连接池、线程局部变量和资源限制等实际问题。

一、平台线程的困境与虚拟线程的诞生

在传统的 Java 并发模型中,每个 java.lang.Thread 实例都对应操作系统的一个内核线程(在大多数 JVM 实现中)。这种一对一映射带来了几个严重问题:

  • 内存开销巨大:每个平台线程默认占用约 1MB 的栈空间,创建 1 万个线程就需要约 10GB 内存,根本无法支撑高并发场景。
  • 上下文切换昂贵:操作系统在内核线程之间切换时,需要保存和恢复寄存器、刷新 TLB 等,当线程数量膨胀时,调度开销甚至超过实际工作负载。
  • 阻塞即浪费:当平台线程因为 IO 操作、网络请求或数据库查询而阻塞时,整个内核线程处于空闲等待状态,无法被其他任务复用。

虚拟线程的解决方案简洁而优雅:JVM 在用户态管理大量轻量级虚拟线程,并将它们动态映射到少量平台线程(称为载体线程)上执行。当虚拟线程遇到阻塞操作时,JVM 会自动将其从载体线程上”卸载”,让载体线程去执行其他就绪的虚拟线程。阻塞结束后,虚拟线程重新排队等待调度。这一机制使得开发者可以创建数十万甚至数百万个虚拟线程,而不会耗尽系统资源。

二、创建虚拟线程的三种方式

Java 21 提供了多种创建虚拟线程的方法,开发者可以根据场景灵活选择。

2.1 通过 Thread.ofVirtual() 直接创建

// 创建并立即启动一个虚拟线程
Thread vThread = Thread.ofVirtual().start(() -> {
    System.out.println("在虚拟线程中执行: " + Thread.currentThread());
});

// 等待虚拟线程执行完成
vThread.join();

// 创建但不立即启动,稍后手动启动
Thread unstarted = Thread.ofVirtual().unstarted(() -> {
    System.out.println("手动启动的虚拟线程");
});
unstarted.start();
unstarted.join();

2.2 通过 Executors.newVirtualThreadPerTaskExecutor() 创建

这是推荐的方式,它会为每个提交的任务创建一个新的虚拟线程:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 创建虚拟线程执行器
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    // 提交1000个任务,每个任务运行在独立的虚拟线程中
    for (int i = 0; i  {
            // 模拟IO操作
            Thread.sleep(100);
            System.out.println("任务 " + taskId + " 完成,线程: " + Thread.currentThread());
            return taskId;
        });
    }
} // try-with-resources 自动等待所有任务完成并关闭执行器

注意,Executors.newVirtualThreadPerTaskExecutor() 返回的执行器会为每个任务创建一个全新的虚拟线程,因此不需要指定线程池大小。虚拟线程的创建成本极低,每个大约只占用几百字节的内存。

2.3 通过 Thread.Builder 定制虚拟线程属性

Thread customVThread = Thread.ofVirtual()
    .name("custom-vthread-", 1)  // 设置线程名称前缀和起始编号
    .inheritInheritableThreadLocals(false) // 不继承可继承的线程局部变量
    .unstarted(() -> {
        System.out.println("定制虚拟线程运行中");
    });
customVThread.start();
customVThread.join();

三、性能对比:平台线程 vs 虚拟线程

我们用一段完整的基准测试代码来直观感受两者的差距。场景是模拟 10000 个并发网络请求,每个请求需要 200 毫秒的”处理时间”(模拟 IO 等待)。

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;

public class ThroughputBenchmark {
    private static final int TASK_COUNT = 10_000;
    private static final int SLEEP_MILLIS = 200;

    public static void main(String[] args) throws Exception {
        // 测试1:使用固定大小平台线程池(200个线程)
        benchmark("平台线程池(200)", Executors.newFixedThreadPool(200));

        // 测试2:使用缓存线程池(会为每个任务创建线程,但受限OS资源)
        // 注意:这个测试可能因为创建过多OS线程而导致OOM,实际运行需谨慎
        // benchmark("缓存线程池", Executors.newCachedThreadPool());

        // 测试3:使用虚拟线程执行器
        benchmark("虚拟线程执行器", Executors.newVirtualThreadPerTaskExecutor());
    }

    private static void benchmark(String label, ExecutorService executor) throws Exception {
        CountDownLatch latch = new CountDownLatch(TASK_COUNT);
        Instant start = Instant.now();

        for (int i = 0; i  {
                try {
                    Thread.sleep(SLEEP_MILLIS); // 模拟IO等待
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        Duration duration = Duration.between(start, Instant.now());
        double throughput = TASK_COUNT / duration.toMillis() * 1000.0;

        System.out.printf("%s → 耗时: %dms, 吞吐量: %.1f 任务/秒%n",
                label, duration.toMillis(), throughput);

        executor.shutdown();
    }
}

典型输出结果:

平台线程池(200) → 耗时: 10207ms, 吞吐量: 979.8 任务/秒
虚拟线程执行器 → 耗时: 513ms, 吞吐量: 19493.2 任务/秒

虚拟线程的吞吐量是平台线程池的近 20 倍,并且延迟显著降低。这是因为平台线程池中只有 200 个线程可以同时处理任务,其余 9800 个任务在队列中等待;而虚拟线程为每个任务分配一个独立的执行单元,10000 个任务几乎同时开始等待,200 毫秒后全部完成。

四、实战案例:将 Spring Boot 应用迁移到虚拟线程

Spring Boot 3.2 及以上版本已经内置了对虚拟线程的支持。要启用虚拟线程处理 HTTP 请求,只需在配置文件中添加一行:

# application.properties
spring.threads.virtual.enabled=true

这会让 Tomcat 或 Jetty 使用虚拟线程来处理每个到来的 HTTP 请求。但除了 Web 层,我们还需要确保所有异步调用和后台任务都迁移到虚拟线程上。

4.1 配置虚拟线程执行器 Bean

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
public class VirtualThreadConfig {

    @Bean(name = "virtualThreadExecutor")
    public ExecutorService virtualThreadExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }
}

4.2 在服务层使用虚拟线程处理并发任务

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

@Service
public class OrderService {

    private final ExecutorService executor;
    private final InventoryClient inventoryClient;
    private final PaymentClient paymentClient;
    private final NotificationClient notificationClient;

    public OrderService(
            @Qualifier("virtualThreadExecutor") ExecutorService executor,
            InventoryClient inventoryClient,
            PaymentClient paymentClient,
            NotificationClient notificationClient) {
        this.executor = executor;
        this.inventoryClient = inventoryClient;
        this.paymentClient = paymentClient;
        this.notificationClient = notificationClient;
    }

    public OrderResult processOrder(Order order) {
        // 使用 CompletableFuture 在虚拟线程中并行调用多个下游服务
        CompletableFuture inventoryFuture = CompletableFuture
                .supplyAsync(() -> inventoryClient.checkStock(order.getProductId()), executor);

        CompletableFuture paymentFuture = CompletableFuture
                .supplyAsync(() -> paymentClient.authorize(order.getPaymentInfo()), executor);

        CompletableFuture userFuture = CompletableFuture
                .supplyAsync(() -> notificationClient.getUserProfile(order.getUserId()), executor);

        // 等待所有并行调用完成并组合结果
        return CompletableFuture.allOf(inventoryFuture, paymentFuture, userFuture)
                .thenApplyAsync(v -> {
                    int stock = inventoryFuture.join();
                    PaymentStatus payment = paymentFuture.join();
                    UserProfile user = userFuture.join();

                    if (stock 
                            notificationClient.sendConfirmation(user.getEmail(), order.getId()));

                    return new OrderResult("SUCCESS", order.getId());
                }, executor)
                .join(); // 在当前虚拟线程中阻塞等待最终结果
    }
}

在这个例子中,我们使用虚拟线程执行器来驱动 CompletableFuture 的异步操作。与使用默认的 ForkJoinPool 不同,虚拟线程执行器可以为每个 supplyAsync 调用分配独立的虚拟线程,即使下游服务响应缓慢,也不会耗尽线程资源。

五、虚拟线程与数据库连接池的适配

虚拟线程带来了一个重要的架构变化:当虚拟线程因数据库查询而阻塞时,它会被自动卸载,因此理论上我们可以用更少的数据库连接服务更多的并发请求。但传统的连接池(如 HikariCP)通常配置较小(例如 10-20 个连接),如果大量虚拟线程同时尝试获取连接,它们将在连接池的等待队列中排队,反而成为瓶颈。

解决方案是适当增大连接池大小,因为虚拟线程不会像平台线程那样消耗大量资源。以下是一个针对虚拟线程优化的 HikariCP 配置:

# 针对虚拟线程优化的数据库连接池配置
spring.datasource.hikari.maximum-pool-size=50
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.idle-timeout=600000
spring.datasource.hikari.max-lifetime=1800000

即使在虚拟线程环境下,连接池仍是必要的——创建数据库连接本身是昂贵的操作。但我们可以将连接池配置得更大,让更多虚拟线程能同时持有数据库连接执行查询,充分利用虚拟线程的并发优势。

六、处理线程局部变量:从 ThreadLocal 到 ScopedValue

虚拟线程可以拥有大量的实例,传统的 ThreadLocal 在使用时需要注意:如果每个虚拟线程都存储大量数据,内存占用会急剧膨胀。Java 21 同时引入了 ScopedValue(预览特性),它是一种不可变的、有作用域的变量,专门为虚拟线程设计。

import java.lang.ScopedValue;

public class RequestContextExample {
    // 定义一个 ScopedValue,用于在请求上下文中传递用户ID
    private static final ScopedValue CURRENT_USER = ScopedValue.newInstance();

    public void handleRequest(String userId) {
        // 在作用域内绑定值并执行业务逻辑
        ScopedValue.where(CURRENT_USER, userId)
                .run(() -> {
                    // 在这个代码块及其调用的任何方法中都可以获取到 userId
                    String currentUser = CURRENT_USER.get();
                    System.out.println("处理用户 " + currentUser + " 的请求");

                    // 调用其他服务方法,无需显式传递 userId
                    processBusinessLogic();
                });
    }

    private void processBusinessLogic() {
        // 即使在这里也能获取到作用域内的用户ID
        String user = CURRENT_USER.get();
        System.out.println("执行业务逻辑,当前用户: " + user);

        // 可以嵌套新的作用域,内层可以获取外层的值
        ScopedValue.where(CURRENT_USER, "admin-override")
                .run(() -> {
                    System.out.println("嵌套作用域内的用户: " + CURRENT_USER.get());
                });
    }
}

ThreadLocal 不同,ScopedValue 是不可变的,这避免了在虚拟线程被重新调度到不同载体线程时可能出现的状态混乱问题。如果你的项目仍在使用 ThreadLocal 存储请求上下文(如用户信息、事务ID),建议逐步迁移到 ScopedValue

七、结构化并发:管理虚拟线程的生命周期

虚拟线程与结构化并发(Structured Concurrency)是天生一对。结构化并发允许你将一组相关的并发任务视为一个单元,统一管理它们的生命周期和异常处理。Java 21 通过 StructuredTaskScope 提供了预览支持。

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class TravelPriceAggregator {

    public TravelPrice aggregatePrices(TravelRequest request) throws Exception {
        // 使用结构化任务作用域,所有子任务共享同一个生命周期
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // 同时查询三家航空公司的价格
            Future airlineA = scope.fork(() -> queryAirlineA(request));
            Future airlineB = scope.fork(() -> queryAirlineB(request));
            Future airlineC = scope.fork(() -> queryAirlineC(request));

            // 等待所有子任务完成,如果任一失败则取消其他任务
            scope.join();
            scope.throwIfFailed();

            // 聚合结果
            double bestPrice = Math.min(
                    Math.min(airlineA.resultNow(), airlineB.resultNow()),
                    airlineC.resultNow()
            );

            return new TravelPrice(bestPrice, request);
        }
    }

    private double queryAirlineA(TravelRequest request) throws InterruptedException {
        Thread.sleep(150); // 模拟API调用
        return 1280.0;
    }

    private double queryAirlineB(TravelRequest request) throws InterruptedException {
        Thread.sleep(200);
        return 1350.0;
    }

    private double queryAirlineC(TravelRequest request) throws InterruptedException {
        Thread.sleep(180);
        return 1199.0;
    }
}

try-with-resources 块中,StructuredTaskScope 确保所有通过 fork() 创建的子任务在作用域结束时都已完成或被取消。如果任何一个子任务失败,ShutdownOnFailure 策略会自动取消其他正在运行的子任务,避免了僵尸任务和资源泄漏。

八、构建一个纯虚拟线程的 HTTP 服务器

最后,我们使用 Java 标准库中的 HttpServer 结合虚拟线程,从头构建一个高性能的 HTTP API 服务,展示虚拟线程在非 Spring 环境中的应用。

import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpExchange;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class VirtualThreadHttpServer {

    private static final Map userStore = new ConcurrentHashMap();
    private static long nextId = 1;

    public static void main(String[] args) throws Exception {
        // 创建HTTP服务器,绑定8080端口
        HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);

        // 关键:使用虚拟线程执行器来处理所有HTTP请求
        server.setExecutor(Executors.newVirtualThreadPerTaskExecutor());

        // 注册路由
        server.createContext("/api/users", VirtualThreadHttpServer::handleUsers);

        server.start();
        System.out.println("虚拟线程HTTP服务器已启动,监听端口 8080");

        // 注册优雅关闭钩子
        Runtime.getRuntime().addShutdownHook(
                Thread.ofPlatform().unstarted(() -> {
                    System.out.println("正在关闭服务器...");
                    server.stop(3);
                })
        );
    }

    private static void handleUsers(HttpExchange exchange) {
        try {
            String method = exchange.getRequestMethod();
            String path = exchange.getRequestURI().getPath();
            String[] segments = path.split("/");

            switch (method) {
                case "GET" -> {
                    if (segments.length == 4) {
                        // GET /api/users/{id}
                        long id = Long.parseLong(segments[3]);
                        User user = userStore.get(id);
                        if (user != null) {
                            sendJson(exchange, 200, toJson(user));
                        } else {
                            sendJson(exchange, 404, "{"error":"用户不存在"}");
                        }
                    } else {
                        // GET /api/users
                        sendJson(exchange, 200, toJson(userStore.values()));
                    }
                }
                case "POST" -> {
                    // POST /api/users
                    String body = new String(exchange.getRequestBody().readAllBytes(),
                            StandardCharsets.UTF_8);
                    User newUser = parseUser(body);
                    newUser = new User(nextId++, newUser.name(), newUser.email());
                    userStore.put(newUser.id(), newUser);
                    sendJson(exchange, 201, toJson(newUser));
                }
                case "PUT" -> {
                    if (segments.length == 4) {
                        long id = Long.parseLong(segments[3]);
                        String body = new String(exchange.getRequestBody().readAllBytes(),
                                StandardCharsets.UTF_8);
                        User updated = parseUser(body);
                        User user = new User(id, updated.name(), updated.email());
                        userStore.put(id, user);
                        sendJson(exchange, 200, toJson(user));
                    } else {
                        sendJson(exchange, 400, "{"error":"缺少用户ID"}");
                    }
                }
                case "DELETE" -> {
                    if (segments.length == 4) {
                        long id = Long.parseLong(segments[3]);
                        userStore.remove(id);
                        sendJson(exchange, 204, "");
                    } else {
                        sendJson(exchange, 400, "{"error":"缺少用户ID"}");
                    }
                }
                default -> sendJson(exchange, 405, "{"error":"不支持的HTTP方法"}");
            }
        } catch (Exception e) {
            sendJson(exchange, 500, "{"error":"" + e.getMessage() + ""}");
        }
    }

    private static void sendJson(HttpExchange exchange, int statusCode, String json) {
        try {
            byte[] bytes = json.getBytes(StandardCharsets.UTF_8);
            exchange.getResponseHeaders().set("Content-Type", "application/json; charset=UTF-8");
            exchange.sendResponseHeaders(statusCode, bytes.length);
            exchange.getResponseBody().write(bytes);
            exchange.getResponseBody().close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static String toJson(Object obj) {
        // 简化实现,生产环境请使用 Jackson 或 Gson
        if (obj instanceof User user) {
            return String.format("{"id":%d,"name":"%s","email":"%s"}",
                    user.id(), user.name(), user.email());
        } else if (obj instanceof Iterable users) {
            StringBuilder sb = new StringBuilder("[");
            boolean first = true;
            for (Object u : users) {
                if (!first) sb.append(",");
                sb.append(toJson(u));
                first = false;
            }
            sb.append("]");
            return sb.toString();
        }
        return "{}";
    }

    private static User parseUser(String json) {
        // 极简JSON解析,生产环境请使用Jackson
        String name = extractJsonValue(json, "name");
        String email = extractJsonValue(json, "email");
        return new User(0, name, email);
    }

    private static String extractJsonValue(String json, String key) {
        String searchKey = """ + key + "":"";
        int start = json.indexOf(searchKey);
        if (start == -1) return "";
        start += searchKey.length();
        int end = json.indexOf(""", start);
        return end == -1 ? "" : json.substring(start, end);
    }

    record User(long id, String name, String email) {}
}

启动该服务器后,每个到达的 HTTP 请求都会在独立的虚拟线程中处理。对于包含阻塞 IO(如数据库查询)的请求,虚拟线程会自动让出载体线程,使服务器能够同时处理数万个并发连接。这个简单的实现已经具备生产级并发能力,而代码量不到 150 行。

九、虚拟线程的最佳实践与注意事项

  • 避免在虚拟线程中使用 synchronized 块:synchronized 块内部发生阻塞时,虚拟线程不会卸载,会导致载体线程被固定(pinning)。应优先使用 ReentrantLock 替代。
  • 不要池化虚拟线程:虚拟线程的设计初衷就是即用即弃,不应该放入线程池复用。需要限制并发数时,使用信号量(Semaphore)来控制。
  • 监控载体线程:虽然虚拟线程很轻量,但载体线程(ForkJoinPool 的工作线程)数量有限。如果所有载体线程都被固定(例如在 synchronized 块中阻塞),新的虚拟线程将无法执行。
  • 小心本地方法调用:在 JNI 调用期间,虚拟线程无法卸载,会一直占用载体线程。对于需要调用本地库的场景,考虑将相关操作隔离到专用的平台线程池中。
  • 逐步迁移:不需要一次性将所有代码都切换到虚拟线程。先在 IO 密集型、高并发的模块中启用,观察性能表现后再推广。

十、总结

Java 21 虚拟线程是 Java 平台近十年来最重要的并发编程改进。它让开发者可以用熟悉的同步编程模型编写出高吞吐量的应用程序,而无需学习复杂的响应式框架或回调链。从本文的基准测试可以看出,在 IO 密集型场景中,虚拟线程能将吞吐量提升一个数量级。

对于现有项目,迁移到虚拟线程的路径非常平滑:将 Executors.newFixedThreadPool(n) 替换为 Executors.newVirtualThreadPerTaskExecutor(),然后逐步排查 synchronized 块和 ThreadLocal 的使用。Spring Boot 3.2 用户更是可以通过一个配置项直接启用虚拟线程,享受其带来的性能红利。

虚拟线程并不是要取代所有现有的并发模型,而是提供了一种在简单性和性能之间取得优雅平衡的新选择。当你的下一个项目面临高并发挑战时,不妨让虚拟线程成为你的首选方案。

Java 21 虚拟线程深度实战:从平台线程到千万级并发的完整迁移指南
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

版权声明:
本站资源有的来自互联网收集整理,本站纯免费分享提供学习使用,如果侵犯了您的合法权益,请联系本站我们会及时删除。
本站资源仅供研究、学习交流之用,免费开源项目不代表完全可商用,若商业用途请先咨询开发企业能否商用,否则产生的一切后果将由下载用户自行承担。
原创板块未经允许不得转载,否则将追究法律责任。

淘吗网 java Java 21 虚拟线程深度实战:从平台线程到千万级并发的完整迁移指南 https://www.taomawang.com/server/java/2081.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务