一、项目架构与技术选型
随着Java 21的发布,虚拟线程(Virtual Threads)为高并发应用带来了革命性的性能提升。本教程将基于Java 21、Spring Boot 3.x和Project Loom构建一个支持亿级并发的微服务网关系统。
1.1 技术栈与版本要求
// 核心技术栈
- Java 21+ (LTS版本)
- Spring Boot 3.2.x
- Spring Framework 6.x
- Project Reactor
- Netty 4.1.x
- Redis 7.x 集群
- PostgreSQL 15.x
// 构建配置
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<spring-boot.version>3.2.0</spring-boot.version>
<projectreactor.version>3.6.0</projectreactor.version>
</properties>
// 项目模块结构
gateway-system/
├── gateway-core/ # 网关核心模块
├── gateway-admin/ # 管理控制台
├── gateway-discovery/ # 服务发现
├── gateway-router/ # 动态路由
├── gateway-filter/ # 过滤器链
└── gateway-monitor/ # 监控统计
1.2 虚拟线程配置与优化
// 虚拟线程执行器配置
@Configuration
public class VirtualThreadConfig {
@Bean
public TaskExecutor virtualThreadExecutor() {
return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
}
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
}
// JVM参数优化
-XX:+UseZGC
-XX:MaxRAMPercentage=75
-XX:ActiveProcessorCount=4
-Dspring.threads.virtual.enabled=true
-Djdk.virtualThreadScheduler.parallelism=8
-Djdk.virtualThreadScheduler.maxPoolSize=256
二、网关核心架构设计
2.1 响应式网关服务器
// 基于Netty的响应式HTTP服务器
@Component
public class ReactiveGatewayServer {
private final DispatcherHandler dispatcherHandler;
private final GatewayMetrics metrics;
public ReactiveGatewayServer(DispatcherHandler dispatcherHandler,
GatewayMetrics metrics) {
this.dispatcherHandler = dispatcherHandler;
this.metrics = metrics;
}
public Mono<Void> start(int port) {
return HttpServer.create()
.port(port)
.handle(this::handleRequest)
.bind()
.doOnSuccess(server -> {
log.info("Gateway server started on port: {}", port);
this.server = server;
})
.then();
}
private Mono<Void> handleRequest(HttpServerRequest request,
HttpServerResponse response) {
return Mono.fromCallable(() -> System.nanoTime())
.flatMap(startTime -> {
ServerWebExchange exchange = createExchange(request, response);
return dispatcherHandler.handle(exchange)
.doOnSuccess(v -> recordMetrics(startTime, exchange, true))
.doOnError(e -> recordMetrics(startTime, exchange, false));
})
.onErrorResume(e -> handleError(response, e));
}
private ServerWebExchange createExchange(HttpServerRequest request,
HttpServerResponse response) {
ServerHttpRequest httpRequest = new ReactorHttpRequestAdapter(request);
ServerHttpResponse httpResponse = new ReactorHttpResponseAdapter(response);
return new DefaultServerWebExchange(httpRequest, httpResponse,
new InMemoryWebSessionManager());
}
}
// 高性能JSON序列化配置
@Configuration
public class JacksonConfig {
@Bean
public Jackson2ObjectMapperBuilderCustomizer jacksonCustomizer() {
return builder -> {
builder.featuresToEnable(StreamReadFeature.USE_FAST_BIG_DECIMAL_PARSER);
builder.featuresToEnable(StreamWriteFeature.USE_FAST_DOUBLE_WRITER);
builder.featuresToEnable(JsonWriteFeature.WRITE_NUMBERS_AS_STRINGS);
builder.modulesToInstall(new JavaTimeModule());
};
}
}
2.2 动态路由引擎
// 基于虚拟线程的动态路由服务
@Service
public class VirtualThreadRoutingService {
private final RouteRepository routeRepository;
private final LoadBalancerClient loadBalancer;
private final ConcurrentHashMap<String, Route> routeCache =
new ConcurrentHashMap<>();
@Async("virtualThreadExecutor")
public CompletableFuture<RouteResult> routeAsync(HttpRequest request) {
return CompletableFuture.supplyAsync(() -> route(request),
Thread.ofVirtual().factory());
}
public RouteResult route(HttpRequest request) {
String path = request.getPath();
String method = request.getMethod().name();
// 从缓存中获取路由
Route route = routeCache.computeIfAbsent(
buildRouteKey(path, method),
k -> findRouteFromRepository(path, method)
);
if (route == null) {
throw new RouteNotFoundException("No route found for: " + path);
}
// 负载均衡选择服务实例
ServiceInstance instance = loadBalancer.choose(route.getServiceId());
return RouteResult.builder()
.route(route)
.instance(instance)
.targetUrl(buildTargetUrl(instance, request))
.build();
}
private Route findRouteFromRepository(String path, String method) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var future = executor.submit(() ->
routeRepository.findByPathAndMethod(path, method));
return future.get(100, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("Failed to find route from repository", e);
return null;
}
}
// 路由规则定义
@Data
@Builder
public static class Route {
private String id;
private String path;
private String method;
private String serviceId;
private List<FilterDefinition> filters;
private Map<String, Object> metadata;
private int order;
private boolean enabled;
}
}
三、过滤器链与限流熔断
3.1 虚拟线程友好的过滤器链
// 网关过滤器接口
public interface GatewayFilter {
String getName();
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
default int getOrder() {
return 0;
}
}
// 基于虚拟线程的过滤器链执行器
@Component
public class VirtualThreadFilterChain implements GatewayFilterChain {
private final List<GatewayFilter> filters;
private final GatewayMetrics metrics;
private int index = 0;
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.fromCompletionStage(CompletableFuture.supplyAsync(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index++);
String filterName = filter.getName();
return metrics.recordFilterTime(filterName, () ->
filter.filter(exchange, this).toFuture()
).thenCompose(Function.identity());
} else {
return CompletableFuture.completedFuture(null);
}
}, Thread.ofVirtual().factory())).then();
}
}
// 认证过滤器实现
@Component
public class AuthenticationFilter implements GatewayFilter {
private final JwtTokenProvider tokenProvider;
private final RedisTemplate<String, String> redisTemplate;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return Mono.fromCallable(() -> {
String token = extractToken(exchange.getRequest());
if (token == null) {
throw new AuthenticationException("Missing authentication token");
}
// 使用虚拟线程执行阻塞操作
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var future = executor.submit(() -> {
if (!tokenProvider.validateToken(token)) {
throw new AuthenticationException("Invalid token");
}
String username = tokenProvider.getUsername(token);
String blacklisted = redisTemplate.opsForValue()
.get("token:blacklist:" + token);
if (blacklisted != null) {
throw new AuthenticationException("Token is blacklisted");
}
// 将用户信息添加到请求头
exchange.getRequest().mutate()
.header("X-User-Id", username)
.build();
return null;
});
return future.get(500, TimeUnit.MILLISECONDS);
}
}).then(chain.filter(exchange));
}
}
3.2 分布式限流与熔断
// 基于Redis的分布式限流器
@Component
public class RedisRateLimiter {
private final RedisTemplate<String, String> redisTemplate;
private final ScriptExecutor<String> scriptExecutor;
public boolean isAllowed(String key, int maxRequests, long windowInSeconds) {
String luaScript = """
local key = KEYS[1]
local max = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('GET', key)
local now = redis.call('TIME')
local currentTime = tonumber(now[1]) * 1000 + tonumber(now[2]) / 1000
if current == false then
redis.call('SET', key, 1)
redis.call('PEXPIRE', key, window * 1000)
return 1
end
local requests = tonumber(current)
if requests < max then
redis.call('INCR', key)
return 1
else
return 0
end
""";
List<String> keys = Collections.singletonList(key);
List<String> args = Arrays.asList(
String.valueOf(maxRequests),
String.valueOf(windowInSeconds)
);
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
keys, args.toArray()
);
return result != null && result == 1;
}
}
// 熔断器实现
@Component
public class CircuitBreaker {
private final Map<String, CircuitState> states = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory());
@Data
private static class CircuitState {
private State state = State.CLOSED;
private int failureCount = 0;
private long lastFailureTime = 0;
private long lastAttemptTime = 0;
private final int failureThreshold;
private final long timeout;
enum State { CLOSED, OPEN, HALF_OPEN }
}
public <T> Mono<T> execute(String serviceName, Mono<T> operation) {
CircuitState state = states.computeIfAbsent(serviceName,
k -> new CircuitState(5, 30000L)); // 5次失败,30秒超时
if (state.getState() == CircuitState.State.OPEN) {
if (System.currentTimeMillis() - state.getLastAttemptTime() > state.getTimeout()) {
state.setState(CircuitState.State.HALF_OPEN);
state.setLastAttemptTime(System.currentTimeMillis());
} else {
return Mono.error(new CircuitBreakerOpenException("Circuit breaker is open"));
}
}
return operation
.doOnSuccess(result -> onSuccess(state))
.doOnError(error -> onError(state, error))
.onErrorResume(error -> handleCircuitState(state, error));
}
private void onSuccess(CircuitState state) {
state.setFailureCount(0);
state.setState(CircuitState.State.CLOSED);
}
private void onError(CircuitState state, Throwable error) {
state.setFailureCount(state.getFailureCount() + 1);
state.setLastFailureTime(System.currentTimeMillis());
if (state.getFailureCount() >= state.getFailureThreshold()) {
state.setState(CircuitState.State.OPEN);
state.setLastAttemptTime(System.currentTimeMillis());
// 安排状态检查
scheduler.schedule(() -> {
if (state.getState() == CircuitState.State.OPEN) {
state.setState(CircuitState.State.HALF_OPEN);
}
}, state.getTimeout(), TimeUnit.MILLISECONDS);
}
}
}
四、服务发现与负载均衡
4.1 响应式服务注册发现
// 基于虚拟线程的服务注册中心客户端
@Component
public class VirtualThreadDiscoveryClient implements DiscoveryClient {
private final WebClient webClient;
private final ScheduledExecutorService heartbeatExecutor;
private final String serviceName;
private final String instanceId;
@EventListener(ApplicationReadyEvent.class)
public void registerService() {
// 使用虚拟线程执行服务注册
Thread.startVirtualThread(() -> {
ServiceInstance instance = createServiceInstance();
while (!Thread.currentThread().isInterrupted()) {
try {
webClient.post()
.uri("/registry/register")
.bodyValue(instance)
.retrieve()
.bodyToMono(Void.class)
.block();
// 发送心跳
sendHeartbeat();
Thread.sleep(30000); // 30秒心跳间隔
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.warn("Service registration failed, retrying...", e);
sleepWithBackoff();
}
}
});
}
private void sendHeartbeat() {
CompletableFuture.runAsync(() -> {
try {
webClient.put()
.uri("/registry/heartbeat/{instanceId}", instanceId)
.retrieve()
.bodyToMono(Void.class)
.block();
} catch (Exception e) {
log.warn("Heartbeat failed", e);
}
}, Thread.ofVirtual().factory());
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var future = executor.submit(() ->
webClient.get()
.uri("/registry/services/{serviceId}", serviceId)
.retrieve()
.bodyToFlux(ServiceInstance.class)
.collectList()
.block()
);
return future.get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("Failed to get service instances", e);
return Collections.emptyList();
}
}
}
// 负载均衡策略
@Component
public class WeightedLoadBalancer implements LoadBalancer {
private final Random random = new Random();
@Override
public ServiceInstance choose(String serviceId, List<ServiceInstance> instances) {
if (instances.isEmpty()) {
throw new NoAvailableInstanceException("No available instances for: " + serviceId);
}
// 计算总权重
int totalWeight = instances.stream()
.mapToInt(instance -> getWeight(instance))
.sum();
// 随机选择
int randomWeight = random.nextInt(totalWeight);
int currentWeight = 0;
for (ServiceInstance instance : instances) {
currentWeight += getWeight(instance);
if (randomWeight < currentWeight) {
return instance;
}
}
return instances.get(instances.size() - 1);
}
private int getWeight(ServiceInstance instance) {
Object weight = instance.getMetadata().get("weight");
return weight != null ? Integer.parseInt(weight.toString()) : 1;
}
}
五、监控与可观测性
5.1 基于Micrometer的指标收集
// 网关指标监控
@Component
public class GatewayMetrics {
private final MeterRegistry meterRegistry;
private final Timer requestTimer;
private final Counter errorCounter;
private final Gauge routeGauge;
public GatewayMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.requestTimer = Timer.builder("gateway.requests")
.description("Gateway request processing time")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
this.errorCounter = Counter.builder("gateway.errors")
.description("Gateway error count")
.tag("type", "gateway")
.register(meterRegistry);
this.routeGauge = Gauge.builder("gateway.routes.active")
.description("Active routes count")
.register(meterRegistry);
}
public <T> Mono<T> recordRequestTime(Mono<T> mono, String routeId) {
return Mono.fromCallable(System::nanoTime)
.flatMap(startTime -> mono
.doOnSuccess(v -> recordSuccess(startTime, routeId))
.doOnError(e -> recordError(startTime, routeId, e))
);
}
private void recordSuccess(long startTime, String routeId) {
long duration = System.nanoTime() - startTime;
requestTimer.record(duration, TimeUnit.NANOSECONDS,
Tags.of(Tag.of("route", routeId), Tag.of("status", "success")));
}
private void recordError(long startTime, String routeId, Throwable error) {
long duration = System.nanoTime() - startTime;
requestTimer.record(duration, TimeUnit.NANOSECONDS,
Tags.of(Tag.of("route", routeId), Tag.of("status", "error")));
errorCounter.increment();
}
}
// 分布式链路追踪
@Component
public class GatewayTracing {
private final Tracer tracer;
public Span startSpan(String name, HttpRequest request) {
Span span = tracer.spanBuilder(name)
.setAttribute("http.method", request.getMethod().name())
.setAttribute("http.path", request.getPath())
.setAttribute("http.headers", request.getHeaders().toString())
.startSpan();
// 从请求头中提取跟踪信息
String traceId = request.getHeader("X-Trace-Id");
if (traceId != null) {
span.setAttribute("trace.id", traceId);
}
return span;
}
public void endSpan(Span span, HttpResponse response) {
span.setAttribute("http.status_code", response.getStatusCode().value());
span.end();
}
}
5.2 性能测试与优化
// 虚拟线程性能测试
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class VirtualThreadPerformanceTest {
@Autowired
private WebTestClient webTestClient;
@Test
void testHighConcurrencyWithVirtualThreads() {
int concurrentRequests = 10000;
var futures = new CompletableFuture[concurrentRequests];
long startTime = System.currentTimeMillis();
// 使用虚拟线程执行并发请求
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < concurrentRequests; i++) {
final int index = i;
futures[i] = CompletableFuture.supplyAsync(() -> {
return webTestClient.get()
.uri("/api/test/" + index)
.exchange()
.expectStatus().isOk()
.returnResult(String.class);
}, executor);
}
// 等待所有请求完成
CompletableFuture.allOf(futures).join();
}
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
System.out.printf("Processed %d requests in %d ms (%d req/s)%n",
concurrentRequests, duration, concurrentRequests * 1000L / duration);
}
}
// JVM监控配置
@Configuration
public class MonitoringConfig {
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> registry.config().commonTags(
"application", "gateway-system",
"environment", "production",
"region", System.getenv("REGION")
);
}
@Bean
public InfluxMeterRegistry influxMeterRegistry() {
return InfluxMeterRegistry.builder(InfluxConfig.DEFAULT)
.clock(Clock.SYSTEM)
.build();
}
}
六、部署与生产实践
6.1 Docker容器化部署
# Dockerfile
FROM eclipse-temurin:21-jre-jammy
# 设置JVM参数
ENV JAVA_OPTS="-XX:+UseZGC -XX:MaxRAMPercentage=75 -Dspring.threads.virtual.enabled=true"
# 创建非root用户
RUN groupadd -r gateway && useradd -r -g gateway gateway
USER gateway
# 复制应用
COPY target/gateway-system.jar app.jar
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=60s --retries=3
CMD curl -f http://localhost:8080/actuator/health || exit 1
# 启动应用
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar /app.jar"]
# docker-compose.yml
version: '3.8'
services:
gateway:
build: .
ports:
- "8080:8080"
environment:
- SPRING_PROFILES_ACTIVE=production
- REDIS_HOST=redis-cluster
- POSTGRES_HOST=postgres
deploy:
resources:
limits:
memory: 2G
reservations:
memory: 1G
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"]
interval: 30s
timeout: 10s
retries: 3
6.2 生产环境配置
# application-production.yml
server:
port: 8080
netty:
connection-timeout: 30s
idle-timeout: 60s
spring:
threads:
virtual:
enabled: true
data:
redis:
cluster:
nodes: redis-cluster:6379
timeout: 2s
lettuce:
pool:
max-active: 32
max-idle: 16
min-idle: 8
management:
endpoints:
web:
exposure:
include: health,metrics,info,prometheus
endpoint:
health:
show-details: always
metrics:
enabled: true
gateway:
rate-limiter:
enabled: true
default-requests-per-second: 1000
circuit-breaker:
enabled: true
failure-threshold: 5
timeout: 30000
总结
本教程详细介绍了基于Java 21虚拟线程技术构建高性能微服务网关系统的完整方案。通过虚拟线程、响应式编程、分布式限流等现代Java技术,实现了能够支撑亿级并发的高性能网关系统。
系统核心优势:
- 基于Java 21虚拟线程,实现百万级并发连接
- 响应式架构,非阻塞IO处理
- 动态路由与服务发现
- 分布式限流与熔断保护
- 完整的监控与可观测性
- 容器化部署与弹性伸缩
该系统展示了Java在现代微服务架构中的强大能力,为构建高性能、高可用的分布式系统提供了完整的技术方案。开发者可以基于此架构扩展更多高级功能,如API聚合、协议转换、安全防护等企业级需求。

