一、系统架构设计
本教程将基于Java生态构建一个完整的实时数据处理系统,实现从数据采集到实时分析的全流程解决方案。
技术架构:
- 消息队列:Kafka 3.0
- 流处理框架:Flink 1.15
- 状态管理:RocksDB状态后端
- 数据存储:Apache Doris 1.0
- 监控系统:Prometheus + Grafana
核心功能模块:
- 分布式消息采集
- 流式数据清洗
- 实时聚合计算
- Exactly-Once处理语义
- 动态扩缩容方案
二、项目初始化与配置
1. 开发环境准备
# 项目依赖配置(pom.xml关键部分)
<properties>
<flink.version>1.15.0</flink.version>
<kafka.version>3.0.0</kafka.version>
</properties>
<dependencies>
<!-- Flink核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Kafka连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 状态后端 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2. 项目目录结构
src/
├── main/
│ ├── java/
│ │ ├── com/
│ │ │ ├── rtdata/
│ │ │ │ ├── common/ # 公共工具类
│ │ │ │ ├── config/ # 配置类
│ │ │ │ ├── job/ # Flink作业
│ │ │ │ │ ├── source/ # 数据源
│ │ │ │ │ ├── process/ # 处理逻辑
│ │ │ │ │ └── sink/ # 输出目标
│ │ │ │ ├── model/ # 数据模型
│ │ │ │ └── MainApp.java # 主入口
│ ├── resources/
│ │ ├── application.yml # 应用配置
│ │ └── log4j2.xml # 日志配置
├── test/ # 测试代码
└── docker/ # Docker部署文件
三、核心功能实现
1. Kafka数据源配置
// src/main/java/com/rtdata/job/source/KafkaSourceBuilder.java
public class KafkaSourceBuilder {
public static KafkaSource<String> buildKafkaSource(String topic) {
return KafkaSource.<String>builder()
.setBootstrapServers("kafka-server:9092")
.setTopics(topic)
.setGroupId("rtdata-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("auto.offset.reset", "latest")
.setProperty("enable.auto.commit", "false")
.build();
}
}
// 使用示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSourceBuilder.buildKafkaSource("user_behavior");
DataStream<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
2. 流式数据处理流水线
// src/main/java/com/rtdata/job/process/DataPipeline.java
public class DataPipeline {
public static DataStream<UserBehavior> buildPipeline(
DataStream<String> inputStream) {
// 1. 数据解析
DataStream<UserBehavior> parsedStream = inputStream
.map(new JSONParser())
.name("JSON Parser")
.uid("json-parser");
// 2. 数据过滤
DataStream<UserBehavior> filteredStream = parsedStream
.filter(new DataFilter())
.name("Data Filter")
.uid("data-filter");
// 3. 关键字段提取
DataStream<Tuple3<Long, String, Integer>> keyedStream = filteredStream
.process(new FieldExtractor())
.name("Field Extractor")
.uid("field-extractor");
// 4. 窗口聚合
DataStream<WindowResult> windowStream = keyedStream
.keyBy(event -> event.f1) // 按行为类型分组
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new BehaviorCounter())
.name("Window Aggregator")
.uid("window-aggregator");
return windowStream;
}
// JSON解析器
private static class JSONParser
implements MapFunction<String, UserBehavior> {
@Override
public UserBehavior map(String value) throws Exception {
return JSON.parseObject(value, UserBehavior.class);
}
}
// 数据过滤器
private static class DataFilter implements FilterFunction<UserBehavior> {
@Override
public boolean filter(UserBehavior value) {
return value != null
&& value.getUserId() != null
&& value.getTimestamp() > 0;
}
}
}
四、状态管理与容错
1. RocksDB状态后端配置
// src/main/java/com/rtdata/config/FlinkConfig.java
public class FlinkConfig {
public static StreamExecutionEnvironment initEnv() {
// 初始化执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置状态后端
StateBackend stateBackend = new RocksDBStateBackend(
"hdfs://namenode:8020/flink/checkpoints",
true); // 启用增量检查点
env.setStateBackend(stateBackend);
// 检查点配置
env.enableCheckpointing(60000); // 60秒触发一次checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(10, TimeUnit.SECONDS) // 间隔时间
));
return env;
}
}
2. 实现Exactly-Once语义
// src/main/java/com/rtdata/job/sink/KafkaExactlyOnceSink.java
public class KafkaExactlyOnceSink implements SinkFunction<WindowResult> {
private final String topic;
private transient KafkaProducer<String, String> producer;
public KafkaExactlyOnceSink(String topic) {
this.topic = topic;
}
@Override
public void invoke(WindowResult value, Context context) throws Exception {
if (producer == null) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-server:9092");
props.put("transactional.id", "rtdata-producer-" +
Thread.currentThread().getId());
producer = new KafkaProducer<>(props);
producer.initTransactions();
}
try {
producer.beginTransaction();
// 发送主消息
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, value.toString());
producer.send(record);
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
}
@Override
public void finish() {
if (producer != null) {
producer.close();
}
}
}
// 使用示例
DataStream<WindowResult> resultStream = ...;
resultStream.addSink(new KafkaExactlyOnceSink("analytics_result"))
.name("Exactly-Once Sink")
.uid("exactly-once-sink");
五、动态扩缩容方案
1. 基于Kafka的分区感知
// src/main/java/com/rtdata/job/source/KafkaPartitionDiscoverer.java
public class KafkaPartitionDiscoverer
implements PartitionDiscoverer {
private final String topic;
private final Properties kafkaProps;
private KafkaConsumer<?, ?> consumer;
public KafkaPartitionDiscoverer(String topic, Properties props) {
this.topic = topic;
this.kafkaProps = props;
}
@Override
public List<KafkaTopicPartition> discoverPartitions() {
if (consumer == null) {
consumer = new KafkaConsumer<>(kafkaProps);
}
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
return partitions.stream()
.map(p -> new KafkaTopicPartition(p.topic(), p.partition()))
.collect(Collectors.toList());
}
@Override
public void close() {
if (consumer != null) {
consumer.close();
}
}
}
// 在Flink作业中使用
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka-server:9092")
.setTopics("user_behavior")
.setDeserializer(new SimpleStringSchema())
.setPartitionDiscoverer(
new KafkaPartitionDiscoverer("user_behavior", kafkaProps))
.build();
2. Flink弹性扩缩容配置
// 在提交作业时指定扩缩容参数
./bin/flink run
-Djobmanager.memory.process.size=2048m
-Dtaskmanager.memory.process.size=4096m
-Dtaskmanager.numberOfTaskSlots=4
-Dparallelism.default=10
-Drescale.enable=true
-c com.rtdata.MainApp
rtdata-job.jar
// 运行时动态调整并行度
// 通过REST API调整
curl -X PATCH "http://flink-jobmanager:8081/jobs/:jobid?parallelism=20"
// 或在代码中配置弹性策略
env.setParallelism(10);
env.setMaxParallelism(100);
六、监控与告警
1. Prometheus指标集成
// src/main/java/com/rtdata/metrics/MetricReporter.java
public class MetricReporter
implements MetricReporter, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MetricReporter.class);
private final PrometheusMeterRegistry registry;
private ScheduledExecutorService executor;
public MetricReporter() {
this.registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
CollectorRegistry.defaultRegistry.register(registry.getPrometheusRegistry());
}
@Override
public void notifyOfAddedMetric(Metric metric, String name, MetricGroup group) {
String metricName = group.getScopeComponents()[0] + "_" + name;
if (metric instanceof Counter) {
registry.counter(metricName).bindTo((Counter) metric);
} else if (metric instanceof Gauge) {
registry.gauge(metricName, (Gauge<Number>) metric);
} else if (metric instanceof Meter) {
registry.meter(metricName).bindTo((Meter) metric);
}
}
@Override
public void open(MetricConfig config) {
executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS);
}
@Override
public void close() {
if (executor != null) {
executor.shutdown();
}
}
@Override
public void run() {
try {
String scrape = registry.scrape();
// 将指标数据推送到Prometheus Pushgateway
PushGateway pg = new PushGateway("prometheus:9091");
pg.pushAdd(registry.getPrometheusRegistry(), "rtdata_job");
} catch (Exception e) {
LOG.error("Failed to report metrics", e);
}
}
}
// 在Flink配置中启用
env.getConfig().setMetricReporter(new MetricReporter());
七、生产环境部署
1. Docker Compose部署方案
# docker-compose.yml
version: '3.8'
services:
jobmanager:
image: flink:1.15.0-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./rtdata-job.jar:/opt/flink/usrlib/rtdata-job.jar
deploy:
resources:
limits:
memory: 2048M
taskmanager:
image: flink:1.15.0-scala_2.12
depends_on:
- jobmanager
command: taskmanager
scale: 3
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
deploy:
resources:
limits:
memory: 4096M
kafka:
image: bitnami/kafka:3.0
ports:
- "9092:9092"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
volumes:
- grafana-storage:/var/lib/grafana
volumes:
grafana-storage:
八、总结与扩展
本教程构建了一个完整的实时数据处理系统:
- 实现了Kafka到Flink的流式管道
- 开发了Exactly-Once处理语义
- 配置了高性能状态后端
- 实现了动态扩缩容能力
- 搭建了完整的监控体系
扩展方向:
- 机器学习模型集成
- 复杂事件模式检测
- 多流Join优化
- 批流一体化处理