基于Swoole构建千万级并发金融数据推送系统的完整指南
一、金融行情系统技术选型
主流实时数据处理方案对比:
技术方案 | 延迟 | 并发能力 | 开发复杂度 |
---|---|---|---|
传统轮询 | 高(3-5秒) | 低(<1K QPS) | 低 |
长轮询 | 中(1-2秒) | 中(5K QPS) | 中 |
WebSocket | 低(<0.1秒) | 高(50K+ QPS) | 高 |
gRPC流 | 极低(<0.05秒) | 极高(100K+ QPS) | 极高 |
二、系统架构设计
1. 分布式架构设计
数据源 → 采集集群 → 处理中心 → 分发节点 → 客户端 ↑ ↑ ↑ ↑ 交易所API 多协议适配 实时计算引擎 WebSocket网关
2. 数据流转流程
行情采集 → 协议解析 → 数据清洗 → 业务处理 → 压缩传输 → 客户端渲染
↑ ↑ ↑ ↑ ↑ ↑
TCP/UDP 二进制解码 异常值过滤 涨跌计算 Snappy压缩 Canvas绘图
三、核心模块实现
1. 高性能行情采集服务
<?php
class MarketDataCollector
{
private $sources = [];
private $buffer = [];
private $isRunning = false;
public function addSource(string $source, callable $parser)
{
$this->sources[$source] = $parser;
}
public function start()
{
$this->isRunning = true;
// 启动多个协程并发采集
foreach ($this->sources as $source => $parser) {
go(function() use ($source, $parser) {
$client = new SwooleCoroutineHttpClient(
parse_url($source, PHP_URL_HOST),
parse_url($source, PHP_URL_PORT) ?? 80
);
while ($this->isRunning) {
$client->get(parse_url($source, PHP_URL_PATH));
if ($client->statusCode === 200) {
$data = $parser($client->body);
$this->bufferData($data);
}
Coroutine::sleep(0.1); // 100ms间隔
}
});
}
}
private function bufferData(array $data)
{
foreach ($data as $symbol => $tick) {
$this->buffer[$symbol][] = $tick;
// 缓冲控制
if (count($this->buffer[$symbol]) > 100) {
array_shift($this->buffer[$symbol]);
}
}
}
public function getLatest(): array
{
$snapshot = [];
foreach ($this->buffer as $symbol => $ticks) {
$snapshot[$symbol] = end($ticks);
}
return $snapshot;
}
}
2. WebSocket推送服务
<?php
class WebSocketServer
{
private $server;
private $subscribers = [];
public function __construct(string $host, int $port)
{
$this->server = new SwooleWebSocketServer($host, $port);
$this->server->on('open', function($server, $request) {
$this->log("客户端 {$request->fd} 连接");
});
$this->server->on('message', function($server, $frame) {
$this->handleMessage($frame->fd, $frame->data);
});
$this->server->on('close', function($server, $fd) {
$this->unsubscribeAll($fd);
$this->log("客户端 {$fd} 断开");
});
}
private function handleMessage(int $fd, string $message)
{
$data = json_decode($message, true);
switch ($data['action'] ?? '') {
case 'subscribe':
$this->subscribe($fd, $data['symbols']);
break;
case 'unsubscribe':
$this->unsubscribe($fd, $data['symbols']);
break;
}
}
public function subscribe(int $fd, array $symbols)
{
foreach ($symbols as $symbol) {
$this->subscribers[$symbol][$fd] = $fd;
}
}
public function broadcast(array $data)
{
foreach ($data as $symbol => $tick) {
if (empty($this->subscribers[$symbol])) {
continue;
}
$payload = json_encode([
'symbol' => $symbol,
'data' => $tick,
'timestamp' => time()
]);
foreach ($this->subscribers[$symbol] as $fd) {
$this->server->push($fd, $payload);
}
}
}
public function start()
{
$this->server->start();
}
}
四、高级功能实现
1. K线合成引擎
<?php
class KLineGenerator
{
private $intervals = ['1m', '5m', '15m', '1h', '4h', '1d'];
private $buffers = [];
public function __construct()
{
foreach ($this->intervals as $interval) {
$this->buffers[$interval] = [];
}
}
public function processTick(string $symbol, array $tick)
{
foreach ($this->intervals as $interval) {
$period = $this->getPeriodSeconds($interval);
$timestamp = floor($tick['timestamp'] / $period) * $period;
if (!isset($this->buffers[$interval][$symbol][$timestamp])) {
$this->initializeCandle($interval, $symbol, $timestamp, $tick);
} else {
$this->updateCandle($interval, $symbol, $timestamp, $tick);
}
// 检查是否完成当前K线
if (time() - $timestamp >= $period) {
$this->finalizeCandle($interval, $symbol, $timestamp);
}
}
}
private function initializeCandle($interval, $symbol, $timestamp, $tick)
{
$this->buffers[$interval][$symbol][$timestamp] = [
'open' => $tick['price'],
'high' => $tick['price'],
'low' => $tick['price'],
'close' => $tick['price'],
'volume' => $tick['volume'],
'start_time' => $timestamp,
'end_time' => $timestamp + $this->getPeriodSeconds($interval)
];
}
public function getKLines($symbol, $interval, $limit = 100): array
{
return array_slice(
array_reverse($this->buffers[$interval][$symbol] ?? []),
0, $limit
);
}
}
2. 实时风险监控
<?php
class RiskMonitor
{
private $alertRules = [
'price_change' => [
'threshold' => 0.1, // 10%
'period' => 300 // 5分钟
],
'volume_spike' => [
'threshold' => 3.0, // 3倍平均
'period' => 3600 // 1小时
]
];
private $history = [];
public function checkRisk(string $symbol, array $tick): ?array
{
$alerts = [];
$this->storeHistory($symbol, $tick);
// 价格波动检查
$priceChange = $this->calcPriceChange($symbol, $this->alertRules['price_change']['period']);
if (abs($priceChange) >= $this->alertRules['price_change']['threshold']) {
$alerts[] = [
'type' => 'price_change',
'symbol' => $symbol,
'value' => $priceChange,
'threshold' => $this->alertRules['price_change']['threshold']
];
}
// 成交量突增检查
$volumeRatio = $this->calcVolumeRatio($symbol, $this->alertRules['volume_spike']['period']);
if ($volumeRatio >= $this->alertRules['volume_spike']['threshold']) {
$alerts[] = [
'type' => 'volume_spike',
'symbol' => $symbol,
'value' => $volumeRatio,
'threshold' => $this->alertRules['volume_spike']['threshold']
];
}
return empty($alerts) ? null : $alerts;
}
private function storeHistory(string $symbol, array $tick)
{
$this->history[$symbol][] = $tick;
// 保留最近24小时数据
$cutoff = time() - 86400;
$this->history[$symbol] = array_filter(
$this->history[$symbol],
fn($item) => $item['timestamp'] > $cutoff
);
}
}
五、性能优化策略
1. 内存优化技巧
<?php
class MemoryOptimizedStorage
{
private $data = [];
private $serialized = [];
public function store(string $key, array $value)
{
// 使用序列化减少内存占用
$this->serialized[$key] = igbinary_serialize($value);
unset($this->data[$key]);
}
public function retrieve(string $key): array
{
if (!isset($this->data[$key]) && isset($this->serialized[$key])) {
$this->data[$key] = igbinary_unserialize($this->serialized[$key]);
}
return $this->data[$key] ?? [];
}
public function optimizeMemory()
{
// 定期清理已加载的数据
foreach ($this->data as $key => $value) {
if (!isset($this->serialized[$key])) {
$this->serialized[$key] = igbinary_serialize($value);
}
unset($this->data[$key]);
}
}
}
// 使用共享内存加速
class SharedMemoryCache
{
private $shmId;
public function __construct(string $key, int $size)
{
$this->shmId = shmop_open(
ftok(__FILE__, $key),
"c",
0644,
$size
);
}
public function store(string $key, string $value)
{
$data = $this->loadAll();
$data[$key] = $value;
shmop_write($this->shmId, json_encode($data), 0);
}
public function loadAll(): array
{
$data = shmop_read($this->shmId, 0, shmop_size($this->shmId));
return json_decode($data, true) ?: [];
}
}
2. 协程并发控制
<?php
class CoroutinePool
{
private $maxWorkers;
private $channel;
private $running = 0;
public function __construct(int $maxWorkers)
{
$this->maxWorkers = $maxWorkers;
$this->channel = new SwooleCoroutineChannel($maxWorkers);
}
public function submit(callable $task, ...$args): void
{
go(function() use ($task, $args) {
$this->channel->push(true); // 占用一个工作槽
try {
$this->running++;
$task(...$args);
} finally {
$this->running--;
$this->channel->pop(); // 释放工作槽
}
});
}
public function wait(): void
{
while ($this->running > 0) {
SwooleCoroutine::sleep(0.1);
}
}
}
// 使用示例
$pool = new CoroutinePool(200); // 最大200并发
foreach ($symbols as $symbol) {
$pool->submit(function() use ($symbol) {
$data = $this->marketData->getHistory($symbol);
$this->processSymbol($symbol, $data);
});
}
$pool->wait();
六、实战案例:券商行情系统
1. 行情数据API
<?php
class MarketDataController
{
public function getRealtime(Request $request)
{
$symbols = explode(',', $request->get('symbols'));
$data = $this->marketService->getLatest($symbols);
return new JsonResponse([
'code' => 200,
'data' => $data,
'timestamp' => time()
]);
}
public function getKLine(Request $request)
{
$symbol = $request->get('symbol');
$interval = $request->get('interval', '1m');
$limit = min($request->get('limit', 100), 1000);
$klines = $this->klineService->getKLines($symbol, $interval, $limit);
return new JsonResponse([
'code' => 200,
'data' => array_values($klines),
'symbol' => $symbol,
'interval' => $interval
]);
}
}
2. WebSocket客户端集成
<?php
class WebSocketClient
{
private $client;
private $callbacks = [];
public function __construct(string $url)
{
$this->client = new WebSocketClient($url);
$this->client->onMessage(function($message) {
$data = json_decode($message, true);
$this->dispatch($data['event'], $data['data']);
});
}
public function subscribe(array $symbols)
{
$this->client->send(json_encode([
'action' => 'subscribe',
'symbols' => $symbols
]));
}
public function on(string $event, callable $callback)
{
$this->callbacks[$event][] = $callback;
}
private function dispatch(string $event, $data)
{
foreach ($this->callbacks[$event] ?? [] as $callback) {
$callback($data);
}
}
}
// 前端使用示例
const ws = new WebSocketClient('ws://market.example.com:9501');
ws.subscribe(['AAPL', 'MSFT', 'GOOGL']);
ws.on('price_update', function(data) {
console.log(`股票 ${data.symbol} 最新价: ${data.price}`);
updateChart(data);
});