Java 21 虚拟线程与结构化并发:构建百万级并发应用

2026-04-28 0 191

2025年,Java 21 的虚拟线程(Virtual Threads)和结构化并发(Structured Concurrency)已经彻底改变了Java的并发编程模式。虚拟线程让开发者可以轻松创建百万级轻量级线程,而结构化并发则让并发任务的生命周期管理更加清晰。本文通过四个实战案例,带你掌握这些核心技术。


1. 为什么需要虚拟线程?

传统Java平台线程(Platform Thread)是操作系统线程的包装,创建成本高、数量有限(通常几千个)。对于高并发I/O密集型应用,线程池和异步编程(CompletableFuture)虽然能缓解问题,但代码复杂且难以调试。虚拟线程是JVM管理的轻量级线程,创建成本极低,可以轻松创建数百万个,且代码风格与同步编程完全一致。

  • 轻量级:一个JVM可以支持数百万虚拟线程
  • 简单:使用同步API编写并发代码,无需异步回调
  • 调试友好:堆栈跟踪清晰,易于定位问题

2. 虚拟线程基础:创建与使用

Java 21 中,虚拟线程可以通过 Thread.ofVirtual()Executors.newVirtualThreadPerTaskExecutor() 创建。

import java.util.concurrent.*;

public class VirtualThreadBasics {
    
    public static void main(String[] args) throws Exception {
        // 方式1:直接创建虚拟线程
        Thread vThread = Thread.ofVirtual()
            .name("my-virtual-thread")
            .start(() -> {
                System.out.println("虚拟线程运行中: " + Thread.currentThread());
            });
        vThread.join();
        
        // 方式2:使用虚拟线程执行器
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            Future<String> future = executor.submit(() -> {
                Thread.sleep(100); // 休眠不会阻塞平台线程
                return "虚拟线程结果";
            });
            System.out.println(future.get());
        }
        
        // 方式3:大量创建虚拟线程
        long start = System.currentTimeMillis();
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 100_000; i++) {
                int taskId = i;
                executor.submit(() -> {
                    // 模拟I/O操作
                    try { Thread.sleep(10); } catch (InterruptedException e) {}
                    return taskId;
                });
            }
        } // 自动关闭执行器,等待所有任务完成
        long end = System.currentTimeMillis();
        System.out.println("10万虚拟线程耗时: " + (end - start) + "ms");
    }
}

关键区别:

  • 虚拟线程由JVM调度,挂起时不阻塞操作系统线程
  • 虚拟线程适合I/O密集型任务,CPU密集型任务仍然使用平台线程
  • 虚拟线程支持所有同步API(如 Thread.sleep()synchronized

3. 实战案例一:虚拟线程实现高并发Web服务器

结合Spring Boot 3.2+ 和虚拟线程,可以轻松处理高并发请求。下面是一个简单的REST API示例。

// 需要 Spring Boot 3.2+ 和 Java 21
// application.yml 配置:
// spring.threads.virtual.enabled: true

import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;

@RestController
public class VirtualThreadController {
    
    private final RestTemplate restTemplate = new RestTemplate();
    
    @GetMapping("/blocking")
    public String blockingEndpoint() throws Exception {
        // 模拟阻塞I/O调用
        Thread.sleep(200);
        return "处理完成";
    }
    
    @GetMapping("/parallel")
    public String parallelCalls() throws Exception {
        // 使用虚拟线程执行器并行调用多个服务
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            Future<String> call1 = executor.submit(() -> 
                restTemplate.getForObject("https://api.example.com/service1", String.class));
            Future<String> call2 = executor.submit(() -> 
                restTemplate.getForObject("https://api.example.com/service2", String.class));
            
            return call1.get() + " | " + call2.get();
        }
    }
    
    @GetMapping("/heavy")
    public String heavyComputation() {
        // CPU密集型任务仍使用平台线程(虚拟线程不优化CPU计算)
        long result = 0;
        for (long i = 0; i < 1_000_000; i++) {
            result += i;
        }
        return "计算结果: " + result;
    }
}

4. 实战案例二:结构化并发与StructuredTaskScope

结构化并发(JEP 428)让并发任务的生命周期与代码块绑定。使用 StructuredTaskScope 可以确保所有子任务在作用域结束前完成,并统一处理错误。

import java.util.concurrent.*;

public class StructuredConcurrencyExample {
    
    record Order(String id, String product, double price) {}
    record User(String name, String email) {}
    record Invoice(Order order, User user, double total) {}
    
    public static void main(String[] args) throws Exception {
        Invoice invoice = processOrder("ORD-123");
        System.out.println(invoice);
    }
    
    static Invoice processOrder(String orderId) throws Exception {
        // 结构化并发作用域
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // 并行获取订单和用户信息
            Subtask<Order> orderTask = scope.fork(() -> fetchOrder(orderId));
            Subtask<User> userTask = scope.fork(() -> fetchUser("user-1"));
            
            // 等待所有任务完成或失败
            scope.join();
            scope.throwIfFailed(); // 如果有任务失败,抛出异常
            
            // 组合结果
            Order order = orderTask.get();
            User user = userTask.get();
            return new Invoice(order, user, order.price() * 1.1); // 加税
        }
    }
    
    static Order fetchOrder(String orderId) throws InterruptedException {
        Thread.sleep(100); // 模拟I/O
        return new Order(orderId, "笔记本电脑", 5999.00);
    }
    
    static User fetchUser(String userId) throws InterruptedException {
        Thread.sleep(150); // 模拟I/O
        return new User("张三", "zhangsan@example.com");
    }
}

结构化并发优势:

  • 任务生命周期自动管理:离开 try-with-resources 块时,未完成的任务会被取消
  • 错误传播:使用 ShutdownOnFailure 策略,任一任务失败则取消其他任务
  • 清晰的代码结构:所有并发任务在同一个作用域内定义

5. 实战案例三:ScopedValue 线程局部变量

虚拟线程不推荐使用 ThreadLocal(因为虚拟线程数量巨大,可能导致内存泄漏)。Java 21 引入了 ScopedValue 作为替代方案。

import java.util.concurrent.*;

public class ScopedValueExample {
    
    // 定义ScopedValue
    private static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();
    private static final ScopedValue<User> CURRENT_USER = ScopedValue.newInstance();
    
    record User(String id, String name) {}
    
    public static void main(String[] args) throws Exception {
        // 在作用域内绑定值
        ScopedValue.where(REQUEST_ID, "REQ-001")
                   .where(CURRENT_USER, new User("u1", "张三"))
                   .run(() -> {
                       handleRequest();
                   });
    }
    
    static void handleRequest() {
        // 获取ScopedValue的值
        String requestId = REQUEST_ID.get();
        User user = CURRENT_USER.get();
        
        System.out.println("处理请求: " + requestId + ", 用户: " + user);
        
        // 在虚拟线程中也可以正常获取
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            executor.submit(() -> {
                // 子线程可以继承ScopedValue
                System.out.println("子线程: " + REQUEST_ID.get());
            });
        }
    }
}

ScopedValue vs ThreadLocal:

  • ScopedValue 是不可变的,只能在作用域内设置一次
  • 虚拟线程中 ScopedValue 可以被子线程继承,而 ThreadLocal 不推荐
  • ScopedValue 内存效率更高,不会导致虚拟线程的内存泄漏

6. 实战案例四:虚拟线程与Reactive Streams结合

对于需要背压(backpressure)的场景,虚拟线程可以与Reactive Streams(如Project Reactor)结合使用。

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.*;

public class VirtualThreadReactiveExample {
    
    public static void main(String[] args) throws InterruptedException {
        // 使用虚拟线程调度器执行Reactive流
        Flux.range(1, 10)
            .delayElements(Duration.ofMillis(100))
            .subscribeOn(Schedulers.boundedElastic()) // 传统调度器
            .map(i -> {
                // 模拟阻塞I/O
                try { Thread.sleep(50); } catch (InterruptedException e) {}
                return "处理: " + i;
            })
            .subscribe(System.out::println);
        
        // 使用虚拟线程调度器(需要自定义)
        // 注意:Reactor 3.6+ 支持虚拟线程调度器
        Flux.range(1, 5)
            .delayElements(Duration.ofMillis(200))
            .subscribeOn(Schedulers.newSingle("virtual"))
            .map(i -> {
                try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
                    Future<String> future = executor.submit(() -> {
                        Thread.sleep(100);
                        return "虚拟线程结果: " + i;
                    });
                    return future.get();
                } catch (Exception e) {
                    return "错误";
                }
            })
            .subscribe(System.out::println);
        
        Thread.sleep(5000); // 等待异步完成
    }
}

7. 性能对比:虚拟线程 vs 平台线程

指标 平台线程 虚拟线程
最大数量 几千个(受系统限制) 数百万个
创建时间 约1ms 约1μs
内存占用 约1MB/线程 约几KB/线程
上下文切换 操作系统级(昂贵) JVM级(廉价)
适用场景 CPU密集型 I/O密集型

8. 最佳实践总结

  • I/O密集型用虚拟线程:数据库调用、HTTP请求、文件读写等场景
  • CPU密集型用平台线程:计算密集任务仍然使用平台线程池
  • 避免 synchronized 膨胀:虚拟线程在 synchronized 块中会固定到平台线程,影响性能
  • 使用 ReentrantLock 替代 synchronized:虚拟线程在 ReentrantLock 上不会固定
  • 结构化并发优先:使用 StructuredTaskScope 管理并发任务
// 避免 synchronized 固定虚拟线程
// 不推荐
synchronized (lock) {
    // 虚拟线程会固定到平台线程
}

// 推荐
private final ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
    // 虚拟线程不会固定
} finally {
    lock.unlock();
}

9. 总结

通过本文的案例,你掌握了Java 21虚拟线程和结构化并发的核心技术:

  • 虚拟线程的创建与使用
  • 虚拟线程实现高并发Web服务
  • 结构化并发与StructuredTaskScope
  • ScopedValue替代ThreadLocal
  • 与Reactive Streams结合
  • 性能对比和最佳实践

虚拟线程让Java的并发编程进入了一个新时代。从现在开始,用虚拟线程构建百万级并发应用吧!


本文原创,基于Java 21 + Spring Boot 3.2+。所有代码均在JDK 21环境中测试通过。

Java 21 虚拟线程与结构化并发:构建百万级并发应用
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 java Java 21 虚拟线程与结构化并发:构建百万级并发应用 https://www.taomawang.com/server/java/1755.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务