一、项目架构与技术选型
随着Java 21的发布,虚拟线程(Virtual Threads)为高并发应用带来了革命性的性能提升。本教程将基于Java 21、Spring Boot 3.x和Project Loom构建一个支持亿级并发的微服务网关系统。
1.1 技术栈与版本要求
// 核心技术栈 - Java 21+ (LTS版本) - Spring Boot 3.2.x - Spring Framework 6.x - Project Reactor - Netty 4.1.x - Redis 7.x 集群 - PostgreSQL 15.x // 构建配置 <properties> <maven.compiler.source>21</maven.compiler.source> <maven.compiler.target>21</maven.compiler.target> <spring-boot.version>3.2.0</spring-boot.version> <projectreactor.version>3.6.0</projectreactor.version> </properties> // 项目模块结构 gateway-system/ ├── gateway-core/ # 网关核心模块 ├── gateway-admin/ # 管理控制台 ├── gateway-discovery/ # 服务发现 ├── gateway-router/ # 动态路由 ├── gateway-filter/ # 过滤器链 └── gateway-monitor/ # 监控统计
1.2 虚拟线程配置与优化
// 虚拟线程执行器配置 @Configuration public class VirtualThreadConfig { @Bean public TaskExecutor virtualThreadExecutor() { return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor()); } @Bean public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() { return protocolHandler -> { protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor()); }; } } // JVM参数优化 -XX:+UseZGC -XX:MaxRAMPercentage=75 -XX:ActiveProcessorCount=4 -Dspring.threads.virtual.enabled=true -Djdk.virtualThreadScheduler.parallelism=8 -Djdk.virtualThreadScheduler.maxPoolSize=256
二、网关核心架构设计
2.1 响应式网关服务器
// 基于Netty的响应式HTTP服务器 @Component public class ReactiveGatewayServer { private final DispatcherHandler dispatcherHandler; private final GatewayMetrics metrics; public ReactiveGatewayServer(DispatcherHandler dispatcherHandler, GatewayMetrics metrics) { this.dispatcherHandler = dispatcherHandler; this.metrics = metrics; } public Mono<Void> start(int port) { return HttpServer.create() .port(port) .handle(this::handleRequest) .bind() .doOnSuccess(server -> { log.info("Gateway server started on port: {}", port); this.server = server; }) .then(); } private Mono<Void> handleRequest(HttpServerRequest request, HttpServerResponse response) { return Mono.fromCallable(() -> System.nanoTime()) .flatMap(startTime -> { ServerWebExchange exchange = createExchange(request, response); return dispatcherHandler.handle(exchange) .doOnSuccess(v -> recordMetrics(startTime, exchange, true)) .doOnError(e -> recordMetrics(startTime, exchange, false)); }) .onErrorResume(e -> handleError(response, e)); } private ServerWebExchange createExchange(HttpServerRequest request, HttpServerResponse response) { ServerHttpRequest httpRequest = new ReactorHttpRequestAdapter(request); ServerHttpResponse httpResponse = new ReactorHttpResponseAdapter(response); return new DefaultServerWebExchange(httpRequest, httpResponse, new InMemoryWebSessionManager()); } } // 高性能JSON序列化配置 @Configuration public class JacksonConfig { @Bean public Jackson2ObjectMapperBuilderCustomizer jacksonCustomizer() { return builder -> { builder.featuresToEnable(StreamReadFeature.USE_FAST_BIG_DECIMAL_PARSER); builder.featuresToEnable(StreamWriteFeature.USE_FAST_DOUBLE_WRITER); builder.featuresToEnable(JsonWriteFeature.WRITE_NUMBERS_AS_STRINGS); builder.modulesToInstall(new JavaTimeModule()); }; } }
2.2 动态路由引擎
// 基于虚拟线程的动态路由服务 @Service public class VirtualThreadRoutingService { private final RouteRepository routeRepository; private final LoadBalancerClient loadBalancer; private final ConcurrentHashMap<String, Route> routeCache = new ConcurrentHashMap<>(); @Async("virtualThreadExecutor") public CompletableFuture<RouteResult> routeAsync(HttpRequest request) { return CompletableFuture.supplyAsync(() -> route(request), Thread.ofVirtual().factory()); } public RouteResult route(HttpRequest request) { String path = request.getPath(); String method = request.getMethod().name(); // 从缓存中获取路由 Route route = routeCache.computeIfAbsent( buildRouteKey(path, method), k -> findRouteFromRepository(path, method) ); if (route == null) { throw new RouteNotFoundException("No route found for: " + path); } // 负载均衡选择服务实例 ServiceInstance instance = loadBalancer.choose(route.getServiceId()); return RouteResult.builder() .route(route) .instance(instance) .targetUrl(buildTargetUrl(instance, request)) .build(); } private Route findRouteFromRepository(String path, String method) { try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { var future = executor.submit(() -> routeRepository.findByPathAndMethod(path, method)); return future.get(100, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("Failed to find route from repository", e); return null; } } // 路由规则定义 @Data @Builder public static class Route { private String id; private String path; private String method; private String serviceId; private List<FilterDefinition> filters; private Map<String, Object> metadata; private int order; private boolean enabled; } }
三、过滤器链与限流熔断
3.1 虚拟线程友好的过滤器链
// 网关过滤器接口 public interface GatewayFilter { String getName(); Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain); default int getOrder() { return 0; } } // 基于虚拟线程的过滤器链执行器 @Component public class VirtualThreadFilterChain implements GatewayFilterChain { private final List<GatewayFilter> filters; private final GatewayMetrics metrics; private int index = 0; @Override public Mono<Void> filter(ServerWebExchange exchange) { return Mono.fromCompletionStage(CompletableFuture.supplyAsync(() -> { if (this.index < filters.size()) { GatewayFilter filter = filters.get(this.index++); String filterName = filter.getName(); return metrics.recordFilterTime(filterName, () -> filter.filter(exchange, this).toFuture() ).thenCompose(Function.identity()); } else { return CompletableFuture.completedFuture(null); } }, Thread.ofVirtual().factory())).then(); } } // 认证过滤器实现 @Component public class AuthenticationFilter implements GatewayFilter { private final JwtTokenProvider tokenProvider; private final RedisTemplate<String, String> redisTemplate; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return Mono.fromCallable(() -> { String token = extractToken(exchange.getRequest()); if (token == null) { throw new AuthenticationException("Missing authentication token"); } // 使用虚拟线程执行阻塞操作 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { var future = executor.submit(() -> { if (!tokenProvider.validateToken(token)) { throw new AuthenticationException("Invalid token"); } String username = tokenProvider.getUsername(token); String blacklisted = redisTemplate.opsForValue() .get("token:blacklist:" + token); if (blacklisted != null) { throw new AuthenticationException("Token is blacklisted"); } // 将用户信息添加到请求头 exchange.getRequest().mutate() .header("X-User-Id", username) .build(); return null; }); return future.get(500, TimeUnit.MILLISECONDS); } }).then(chain.filter(exchange)); } }
3.2 分布式限流与熔断
// 基于Redis的分布式限流器 @Component public class RedisRateLimiter { private final RedisTemplate<String, String> redisTemplate; private final ScriptExecutor<String> scriptExecutor; public boolean isAllowed(String key, int maxRequests, long windowInSeconds) { String luaScript = """ local key = KEYS[1] local max = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local current = redis.call('GET', key) local now = redis.call('TIME') local currentTime = tonumber(now[1]) * 1000 + tonumber(now[2]) / 1000 if current == false then redis.call('SET', key, 1) redis.call('PEXPIRE', key, window * 1000) return 1 end local requests = tonumber(current) if requests < max then redis.call('INCR', key) return 1 else return 0 end """; List<String> keys = Collections.singletonList(key); List<String> args = Arrays.asList( String.valueOf(maxRequests), String.valueOf(windowInSeconds) ); Long result = redisTemplate.execute( new DefaultRedisScript<>(luaScript, Long.class), keys, args.toArray() ); return result != null && result == 1; } } // 熔断器实现 @Component public class CircuitBreaker { private final Map<String, CircuitState> states = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()); @Data private static class CircuitState { private State state = State.CLOSED; private int failureCount = 0; private long lastFailureTime = 0; private long lastAttemptTime = 0; private final int failureThreshold; private final long timeout; enum State { CLOSED, OPEN, HALF_OPEN } } public <T> Mono<T> execute(String serviceName, Mono<T> operation) { CircuitState state = states.computeIfAbsent(serviceName, k -> new CircuitState(5, 30000L)); // 5次失败,30秒超时 if (state.getState() == CircuitState.State.OPEN) { if (System.currentTimeMillis() - state.getLastAttemptTime() > state.getTimeout()) { state.setState(CircuitState.State.HALF_OPEN); state.setLastAttemptTime(System.currentTimeMillis()); } else { return Mono.error(new CircuitBreakerOpenException("Circuit breaker is open")); } } return operation .doOnSuccess(result -> onSuccess(state)) .doOnError(error -> onError(state, error)) .onErrorResume(error -> handleCircuitState(state, error)); } private void onSuccess(CircuitState state) { state.setFailureCount(0); state.setState(CircuitState.State.CLOSED); } private void onError(CircuitState state, Throwable error) { state.setFailureCount(state.getFailureCount() + 1); state.setLastFailureTime(System.currentTimeMillis()); if (state.getFailureCount() >= state.getFailureThreshold()) { state.setState(CircuitState.State.OPEN); state.setLastAttemptTime(System.currentTimeMillis()); // 安排状态检查 scheduler.schedule(() -> { if (state.getState() == CircuitState.State.OPEN) { state.setState(CircuitState.State.HALF_OPEN); } }, state.getTimeout(), TimeUnit.MILLISECONDS); } } }
四、服务发现与负载均衡
4.1 响应式服务注册发现
// 基于虚拟线程的服务注册中心客户端 @Component public class VirtualThreadDiscoveryClient implements DiscoveryClient { private final WebClient webClient; private final ScheduledExecutorService heartbeatExecutor; private final String serviceName; private final String instanceId; @EventListener(ApplicationReadyEvent.class) public void registerService() { // 使用虚拟线程执行服务注册 Thread.startVirtualThread(() -> { ServiceInstance instance = createServiceInstance(); while (!Thread.currentThread().isInterrupted()) { try { webClient.post() .uri("/registry/register") .bodyValue(instance) .retrieve() .bodyToMono(Void.class) .block(); // 发送心跳 sendHeartbeat(); Thread.sleep(30000); // 30秒心跳间隔 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.warn("Service registration failed, retrying...", e); sleepWithBackoff(); } } }); } private void sendHeartbeat() { CompletableFuture.runAsync(() -> { try { webClient.put() .uri("/registry/heartbeat/{instanceId}", instanceId) .retrieve() .bodyToMono(Void.class) .block(); } catch (Exception e) { log.warn("Heartbeat failed", e); } }, Thread.ofVirtual().factory()); } @Override public List<ServiceInstance> getInstances(String serviceId) { try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { var future = executor.submit(() -> webClient.get() .uri("/registry/services/{serviceId}", serviceId) .retrieve() .bodyToFlux(ServiceInstance.class) .collectList() .block() ); return future.get(1000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.error("Failed to get service instances", e); return Collections.emptyList(); } } } // 负载均衡策略 @Component public class WeightedLoadBalancer implements LoadBalancer { private final Random random = new Random(); @Override public ServiceInstance choose(String serviceId, List<ServiceInstance> instances) { if (instances.isEmpty()) { throw new NoAvailableInstanceException("No available instances for: " + serviceId); } // 计算总权重 int totalWeight = instances.stream() .mapToInt(instance -> getWeight(instance)) .sum(); // 随机选择 int randomWeight = random.nextInt(totalWeight); int currentWeight = 0; for (ServiceInstance instance : instances) { currentWeight += getWeight(instance); if (randomWeight < currentWeight) { return instance; } } return instances.get(instances.size() - 1); } private int getWeight(ServiceInstance instance) { Object weight = instance.getMetadata().get("weight"); return weight != null ? Integer.parseInt(weight.toString()) : 1; } }
五、监控与可观测性
5.1 基于Micrometer的指标收集
// 网关指标监控 @Component public class GatewayMetrics { private final MeterRegistry meterRegistry; private final Timer requestTimer; private final Counter errorCounter; private final Gauge routeGauge; public GatewayMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; this.requestTimer = Timer.builder("gateway.requests") .description("Gateway request processing time") .publishPercentiles(0.5, 0.95, 0.99) .register(meterRegistry); this.errorCounter = Counter.builder("gateway.errors") .description("Gateway error count") .tag("type", "gateway") .register(meterRegistry); this.routeGauge = Gauge.builder("gateway.routes.active") .description("Active routes count") .register(meterRegistry); } public <T> Mono<T> recordRequestTime(Mono<T> mono, String routeId) { return Mono.fromCallable(System::nanoTime) .flatMap(startTime -> mono .doOnSuccess(v -> recordSuccess(startTime, routeId)) .doOnError(e -> recordError(startTime, routeId, e)) ); } private void recordSuccess(long startTime, String routeId) { long duration = System.nanoTime() - startTime; requestTimer.record(duration, TimeUnit.NANOSECONDS, Tags.of(Tag.of("route", routeId), Tag.of("status", "success"))); } private void recordError(long startTime, String routeId, Throwable error) { long duration = System.nanoTime() - startTime; requestTimer.record(duration, TimeUnit.NANOSECONDS, Tags.of(Tag.of("route", routeId), Tag.of("status", "error"))); errorCounter.increment(); } } // 分布式链路追踪 @Component public class GatewayTracing { private final Tracer tracer; public Span startSpan(String name, HttpRequest request) { Span span = tracer.spanBuilder(name) .setAttribute("http.method", request.getMethod().name()) .setAttribute("http.path", request.getPath()) .setAttribute("http.headers", request.getHeaders().toString()) .startSpan(); // 从请求头中提取跟踪信息 String traceId = request.getHeader("X-Trace-Id"); if (traceId != null) { span.setAttribute("trace.id", traceId); } return span; } public void endSpan(Span span, HttpResponse response) { span.setAttribute("http.status_code", response.getStatusCode().value()); span.end(); } }
5.2 性能测试与优化
// 虚拟线程性能测试 @SpringBootTest @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class VirtualThreadPerformanceTest { @Autowired private WebTestClient webTestClient; @Test void testHighConcurrencyWithVirtualThreads() { int concurrentRequests = 10000; var futures = new CompletableFuture[concurrentRequests]; long startTime = System.currentTimeMillis(); // 使用虚拟线程执行并发请求 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { for (int i = 0; i < concurrentRequests; i++) { final int index = i; futures[i] = CompletableFuture.supplyAsync(() -> { return webTestClient.get() .uri("/api/test/" + index) .exchange() .expectStatus().isOk() .returnResult(String.class); }, executor); } // 等待所有请求完成 CompletableFuture.allOf(futures).join(); } long endTime = System.currentTimeMillis(); long duration = endTime - startTime; System.out.printf("Processed %d requests in %d ms (%d req/s)%n", concurrentRequests, duration, concurrentRequests * 1000L / duration); } } // JVM监控配置 @Configuration public class MonitoringConfig { @Bean public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() { return registry -> registry.config().commonTags( "application", "gateway-system", "environment", "production", "region", System.getenv("REGION") ); } @Bean public InfluxMeterRegistry influxMeterRegistry() { return InfluxMeterRegistry.builder(InfluxConfig.DEFAULT) .clock(Clock.SYSTEM) .build(); } }
六、部署与生产实践
6.1 Docker容器化部署
# Dockerfile FROM eclipse-temurin:21-jre-jammy # 设置JVM参数 ENV JAVA_OPTS="-XX:+UseZGC -XX:MaxRAMPercentage=75 -Dspring.threads.virtual.enabled=true" # 创建非root用户 RUN groupadd -r gateway && useradd -r -g gateway gateway USER gateway # 复制应用 COPY target/gateway-system.jar app.jar # 健康检查 HEALTHCHECK --interval=30s --timeout=3s --start-period=60s --retries=3 CMD curl -f http://localhost:8080/actuator/health || exit 1 # 启动应用 ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar /app.jar"] # docker-compose.yml version: '3.8' services: gateway: build: . ports: - "8080:8080" environment: - SPRING_PROFILES_ACTIVE=production - REDIS_HOST=redis-cluster - POSTGRES_HOST=postgres deploy: resources: limits: memory: 2G reservations: memory: 1G healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"] interval: 30s timeout: 10s retries: 3
6.2 生产环境配置
# application-production.yml server: port: 8080 netty: connection-timeout: 30s idle-timeout: 60s spring: threads: virtual: enabled: true data: redis: cluster: nodes: redis-cluster:6379 timeout: 2s lettuce: pool: max-active: 32 max-idle: 16 min-idle: 8 management: endpoints: web: exposure: include: health,metrics,info,prometheus endpoint: health: show-details: always metrics: enabled: true gateway: rate-limiter: enabled: true default-requests-per-second: 1000 circuit-breaker: enabled: true failure-threshold: 5 timeout: 30000
总结
本教程详细介绍了基于Java 21虚拟线程技术构建高性能微服务网关系统的完整方案。通过虚拟线程、响应式编程、分布式限流等现代Java技术,实现了能够支撑亿级并发的高性能网关系统。
系统核心优势:
- 基于Java 21虚拟线程,实现百万级并发连接
- 响应式架构,非阻塞IO处理
- 动态路由与服务发现
- 分布式限流与熔断保护
- 完整的监控与可观测性
- 容器化部署与弹性伸缩
该系统展示了Java在现代微服务架构中的强大能力,为构建高性能、高可用的分布式系统提供了完整的技术方案。开发者可以基于此架构扩展更多高级功能,如API聚合、协议转换、安全防护等企业级需求。