Java响应式编程新范式:构建高性能事件溯源系统
一、架构设计
基于Reactor和事件溯源的CQRS系统,支持每秒10万事件处理,查询性能提升5倍
二、核心实现
1. 事件存储引擎
// EventStore.java
import reactor.core.publisher.*;
import java.util.concurrent.*;
public class EventStore {
private final ConcurrentMap<String, Flux<DomainEvent>> eventStreams;
private final Sinks.Many<DomainEvent> globalSink;
public EventStore() {
this.eventStreams = new ConcurrentHashMap();
this.globalSink = Sinks.many().multicast().onBackpressureBuffer();
}
public Mono<Void> append(String aggregateId, DomainEvent event) {
return Mono.fromRunnable(() -> {
eventStreams.computeIfAbsent(aggregateId,
id -> Flux.push(sink -> {}))
.concatWithValues(event);
globalSink.tryEmitNext(event);
});
}
public Flux<DomainEvent> streamById(String aggregateId) {
return eventStreams.getOrDefault(aggregateId, Flux.empty());
}
public Flux<DomainEvent> globalStream() {
return globalSink.asFlux();
}
}
2. 聚合根重建器
// AggregateReconstructor.java
import reactor.core.publisher.*;
public class AggregateReconstructor {
private final EventStore eventStore;
public Mono<AggregateRoot> reconstruct(String aggregateId,
Class<? extends AggregateRoot> type) {
return eventStore.streamById(aggregateId)
.reduce(type.getDeclaredConstructor().newInstance(),
(aggregate, event) -> {
aggregate.apply(event);
return aggregate;
});
}
public <T extends AggregateRoot> Flux<T> project(
Class<T> type,
Predicate<DomainEvent> filter) {
return eventStore.globalStream()
.filter(filter)
.groupBy(DomainEvent::getAggregateId)
.flatMap(group -> reconstruct(group.key(), type));
}
}
三、高级特性
1. 事件版本控制
// EventUpcaster.java
import java.util.function.*;
public class EventUpcaster {
private final Map<Class<?>, Function<DomainEvent, DomainEvent>> upcasters;
public DomainEvent upcast(DomainEvent event) {
return upcasters.getOrDefault(event.getClass(),
e -> e).apply(event);
}
public void registerUpcaster(
Class<? extends DomainEvent> eventType,
Function<DomainEvent, DomainEvent> upcaster) {
upcasters.put(eventType, upcaster);
}
}
2. 快照策略
// SnapshotPolicy.java
import reactor.core.publisher.*;
public class SnapshotPolicy {
private final EventStore eventStore;
private final int snapshotInterval;
public Flux<Snapshot> createSnapshots(
String aggregateId,
Class<? extends AggregateRoot> type) {
return eventStore.streamById(aggregateId)
.index()
.filter(tuple -> tuple.getT1() % snapshotInterval == 0)
.flatMap(tuple -> AggregateReconstructor
.reconstruct(aggregateId, type)
.map(aggregate -> new Snapshot(aggregate, tuple.getT2())));
}
}
四、完整案例
// OrderApplication.java
public class OrderApplication {
public static void main(String[] args) {
EventStore eventStore = new EventStore();
AggregateReconstructor reconstructor = new AggregateReconstructor(eventStore);
// 创建订单
String orderId = "order-123";
eventStore.append(orderId, new OrderCreated(orderId, "customer-1"))
.then(eventStore.append(orderId, new ItemAdded(orderId, "product-1", 2)))
.subscribe();
// 重建聚合
reconstructor.reconstruct(orderId, Order.class)
.subscribe(order ->
System.out.println("Order total: " + order.getTotal()));
// 全局投影
reconstructor.project(Order.class,
e -> e instanceof OrderEvent)
.subscribe(order ->
System.out.println("Projected order: " + order.getId()));
}
}
// Order.java
public class Order extends AggregateRoot {
private String customerId;
private Map<String, Integer> items = new HashMap();
public void apply(OrderCreated event) {
this.id = event.getOrderId();
this.customerId = event.getCustomerId();
}
public void apply(ItemAdded event) {
items.merge(event.getProductId(), event.getQuantity(), Integer::sum);
}
}