发布日期:2024年10月15日
一、系统架构设计
本教程将构建一个完整的实时股票分析平台,采用微服务架构:
- 数据采集层:WebSocket实时行情接入
- 流处理层:Kafka+Spark Streaming
- 计算引擎:Flink复杂事件处理
- 预测服务:TensorFlow Java模型推理
- 微服务架构:Spring Cloud Alibaba
技术栈:Java17 + Spring Boot3 + Kafka + Spark + Flink + TensorFlow Java
二、项目初始化与配置
1. 微服务模块划分
stock-system/
├── stock-gateway/ # API网关
├── stock-service/ # 基础服务
├── data-collector/ # 数据采集
├── stream-processor/ # 流处理
├── ml-predictor/ # 预测服务
├── alert-engine/ # 预警引擎
└── config-center/ # 配置中心
2. 父POM核心依赖
<!-- stock-system/pom.xml -->
<properties>
<java.version>17</java.version>
<spring-boot.version>3.0.0</spring-boot.version>
<spring-cloud.version>2022.0.0</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
三、实时数据采集
1. WebSocket行情客户端
// data-collector/src/main/java/com/stock/collector/WebSocketClient.java
@Slf4j
@Component
public class WebSocketClient {
private final KafkaTemplate<String, StockTick> kafkaTemplate;
private Session session;
public WebSocketClient(KafkaTemplate<String, StockTick> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostConstruct
public void connect() {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
String wsUrl = "wss://api.marketdata.com/v1/stream";
try {
container.connectToServer(this, URI.create(wsUrl));
} catch (Exception e) {
log.error("WebSocket连接失败", e);
}
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
log.info("WebSocket连接已建立");
subscribeSymbols(Arrays.asList("AAPL", "MSFT", "GOOGL"));
}
private void subscribeSymbols(List<String> symbols) {
String subscriptionMsg = String.format("{"action":"subscribe","symbols":"%s"}",
String.join(",", symbols));
try {
session.getBasicRemote().sendText(subscriptionMsg);
} catch (IOException e) {
log.error("订阅消息发送失败", e);
}
}
@OnMessage
public void onMessage(String message) {
StockTick tick = parseTick(message);
kafkaTemplate.send("stock-ticks", tick.getSymbol(), tick);
}
private StockTick parseTick(String json) {
// JSON解析逻辑
}
}
2. Kafka生产者配置
// data-collector/src/main/resources/application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.add.type.headers: false
app:
topics:
stock-ticks: stock-ticks
四、流处理引擎
1. Spark流计算任务
// stream-processor/src/main/java/com/stock/stream/StockStreamProcessor.java
public class StockStreamProcessor {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("StockStreamProcessor")
.setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(
conf, Durations.seconds(5));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StockTickDeserializer.class);
kafkaParams.put("group.id", "stock-group");
kafkaParams.put("auto.offset.reset", "latest");
Collection<String> topics = Arrays.asList("stock-ticks");
JavaInputDStream<ConsumerRecord<String, StockTick>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, StockTick>Subscribe(topics, kafkaParams)
);
// 计算每分钟成交量
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()))
.reduceByKeyAndWindow(
(tick1, tick2) -> new StockTick(
tick1.getSymbol(),
tick2.getPrice(),
tick1.getVolume() + tick2.getVolume(),
tick2.getTimestamp()
),
Durations.minutes(1),
Durations.seconds(30)
)
.foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.printf("%s: 成交量=%dn",
record._1, record._2.getVolume());
});
});
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
log.error("流处理中断", e);
}
}
}
2. Flink复杂事件处理
// stream-processor/src/main/java/com/stock/stream/StockPatternDetector.java
public class StockPatternDetector {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<StockTick> source = KafkaSource.<StockTick>builder()
.setBootstrapServers("localhost:9092")
.setTopics("stock-ticks")
.setDeserializer(new StockTickDeserializationSchema())
.setGroupId("pattern-detector")
.build();
DataStream<StockTick> ticks = env.fromSource(
source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 检测价格暴涨模式
Pattern<StockTick, ?> spikePattern = Pattern.<StockTick>begin("start")
.where(new SimpleCondition<>() {
@Override
public boolean filter(StockTick tick) {
return tick.getPriceChangePercent() > 1.0;
}
})
.timesOrMore(3)
.consecutive();
PatternStream<StockTick> patternStream = CEP.pattern(
ticks.keyBy(StockTick::getSymbol), spikePattern);
DataStream<Alert> alerts = patternStream.process(
new PatternProcessFunction<StockTick, Alert>() {
@Override
public void processMatch(
Map<String, List<StockTick>> match,
Context ctx,
Collector<Alert> out) {
List<StockTick> spikes = match.get("start");
out.collect(new Alert(
"价格暴涨预警",
spikes.get(0).getSymbol(),
spikes.get(spikes.size()-1).getPrice()
));
}
});
alerts.addSink(new KafkaSink());
env.execute("Stock Pattern Detection");
}
}
五、微服务实现
1. Spring Cloud Gateway配置
// stock-gateway/src/main/resources/application.yml
spring:
cloud:
gateway:
routes:
- id: stock-service
uri: lb://stock-service
predicates:
- Path=/api/stocks/**
filters:
- StripPrefix=1
- name: CircuitBreaker
args:
name: stockCircuitBreaker
fallbackUri: forward:/fallback/stock
- id: alert-service
uri: lb://alert-engine
predicates:
- Path=/api/alerts/**
filters:
- StripPrefix=1
nacos:
discovery:
server-addr: localhost:8848
2. 股票查询服务
// stock-service/src/main/java/com/stock/service/StockService.java
@Service
@RequiredArgsConstructor
public class StockService {
private final StockRepository stockRepository;
private final RedisTemplate<String, StockSummary> redisTemplate;
@Cacheable(value = "stockSummary", key = "#symbol")
public StockSummary getStockSummary(String symbol) {
return stockRepository.findSummaryBySymbol(symbol)
.orElseThrow(() -> new StockNotFoundException(symbol));
}
@Scheduled(fixedRate = 60000)
public void refreshHotStocks() {
List<StockSummary> hotStocks = stockRepository.findTop10ByOrderByVolumeDesc();
redisTemplate.opsForValue().set("hotStocks", hotStocks);
}
}
// 控制器
@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockController {
private final StockService stockService;
@GetMapping("/{symbol}")
public ResponseEntity<StockSummary> getStock(
@PathVariable String symbol,
@RequestHeader("X-API-Version") String apiVersion) {
return ResponseEntity.ok(stockService.getStockSummary(symbol));
}
}
六、机器学习预测
1. TensorFlow Java模型加载
// ml-predictor/src/main/java/com/stock/ml/StockPredictor.java
@Service
public class StockPredictor {
private SavedModelBundle model;
private final String MODEL_PATH = "classpath:models/lstm_predictor";
@PostConstruct
public void init() throws Exception {
this.model = SavedModelBundle.load(
ResourceUtils.getFile(MODEL_PATH).getAbsolutePath(), "serve");
}
public PredictionResult predict(StockHistory history) {
try (Tensor<Float> input = createInputTensor(history)) {
List<Tensor<?>> outputs = model.session().runner()
.feed("lstm_input", input)
.fetch("dense_2/Sigmoid:0")
.run();
try (Tensor<Float> output = outputs.get(0).expect(Float.class)) {
float[][] predictions = output.copyTo(new float[1][1]);
return new PredictionResult(predictions[0][0]);
}
}
}
private Tensor<Float> createInputTensor(StockHistory history) {
float[][][] inputData = prepareInputData(history);
return Tensors.create(inputData);
}
}
2. 预测REST接口
// ml-predictor/src/main/java/com/stock/ml/PredictionController.java
@RestController
@RequestMapping("/api/predictions")
@RequiredArgsConstructor
public class PredictionController {
private final StockPredictor predictor;
private final StockHistoryClient historyClient;
@PostMapping
public ResponseEntity<PredictionResult> predict(
@RequestBody PredictionRequest request) {
StockHistory history = historyClient.getHistory(
request.getSymbol(),
request.getDays());
return ResponseEntity.ok(predictor.predict(history));
}
}
七、系统集成与部署
1. Docker Compose配置
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: zookeeper:3.8
ports:
- "2181:2181"
kafka:
image: bitnami/kafka:3.4
ports:
- "9092:9092"
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
- zookeeper
spark-master:
image: bitnami/spark:3.4
ports:
- "8080:8080"
command: >
bash -c "
/opt/bitnami/spark/bin/spark-class org.apache.spark.deploy.master.Master
-h spark-master
--port 7077
--webui-port 8080"
flink-jobmanager:
image: flink:1.17
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
nacos:
image: nacos/nacos-server:2.2.0
ports:
- "8848:8848"
environment:
MODE: standalone
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
2. Kubernetes部署文件
# k8s/deployment.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: stock-service
spec:
replicas: 3
selector:
matchLabels:
app: stock-service
template:
metadata:
labels:
app: stock-service
spec:
containers:
- name: stock-service
image: stock-system/stock-service:1.0.0
ports:
- containerPort: 8080
env:
- name: SPRING_PROFILES_ACTIVE
value: prod
- name: SPRING_CLOUD_NACOS_SERVER_ADDR
value: nacos:8848
resources:
limits:
cpu: "2"
memory: 2Gi
requests:
cpu: "1"
memory: 1Gi
---
apiVersion: v1
kind: Service
metadata:
name: stock-service
spec:
selector:
app: stock-service
ports:
- protocol: TCP
port: 80
targetPort: 8080
八、总结与扩展
通过本教程,您已经掌握了:
- 实时金融数据采集技术
- 分布式流处理架构
- 复杂事件模式检测
- 微服务系统设计
- Java机器学习部署
扩展学习方向:
- 量化交易策略实现
- 区块链行情验证
- 强化学习模型应用
- 边缘计算部署