构建基于WebSocket的高性能业务监控平台
一、系统架构设计
实时监控系统需要处理高并发连接和海量数据推送,我们采用分层架构确保系统稳定性和扩展性。
1.1 技术架构图
数据采集层 → 消息队列 → WebSocket服务层 → 前端可视化层 ↓ ↓ ↓ ↓ 业务系统日志 Redis队列 Workerman ECharts图表 系统性能指标 RabbitMQ GatewayWorker 实时数据更新 应用监控数据 连接管理/消息分发
1.2 核心组件说明
- 数据采集器:负责收集各类监控指标
- 消息中间件:缓冲数据,削峰填谷
- WebSocket服务:维持长连接,实时推送
- 数据存储器:历史数据存储与分析
- 可视化面板:多维度数据展示
二、环境配置与依赖
2.1 扩展安装
# 安装Workerman扩展
composer require workerman/workerman
# 安装Redis扩展
composer require predis/predis
# 安装ECharts前端依赖
npm install echarts vue-echarts
2.2 配置文件
创建config/monitor.php
配置文件:
<?php
return [
// WebSocket配置
'websocket' => [
'host' => '0.0.0.0',
'port' => 2345,
'process_count' => 4
],
// Redis配置
'redis' => [
'host' => '127.0.0.1',
'port' => 6379,
'database' => 1,
'channel' => 'monitor_data'
],
// 监控指标配置
'metrics' => [
'system' => ['cpu', 'memory', 'disk', 'network'],
'business' => ['qps', 'error_rate', 'response_time'],
'application' => ['database', 'cache', 'queue']
]
];
三、WebSocket服务实现
3.1 WebSocket服务器
<?php
namespace appwebsocket;
use WorkermanWorker;
use WorkermanRedisClient;
class MonitorServer
{
protected $worker;
protected $redis;
public function __construct()
{
$config = config('monitor.websocket');
$this->worker = new Worker("websocket://{$config['host']}:{$config['port']}");
$this->worker->count = $config['process_count'];
$this->initialize();
}
private function initialize()
{
$this->worker->onWorkerStart = [$this, 'onWorkerStart'];
$this->worker->onConnect = [$this, 'onConnect'];
$this->worker->onMessage = [$this, 'onMessage'];
$this->worker->onClose = [$this, 'onClose'];
}
public function onWorkerStart($worker)
{
// 连接Redis并订阅监控频道
$this->redis = new Client(config('monitor.redis'));
// 订阅数据更新频道
$this->redis->subscribe(['monitor_data'], function ($channel, $data) use ($worker) {
$this->broadcast($worker, $data);
});
}
public function onConnect($connection)
{
echo "新的监控客户端连接: {$connection->id}n";
// 发送连接成功消息
$connection->send(json_encode([
'type' => 'connection',
'status' => 'success',
'timestamp' => time()
]));
}
public function onMessage($connection, $data)
{
$message = json_decode($data, true);
switch ($message['type'] ?? '') {
case 'subscribe':
$this->handleSubscribe($connection, $message);
break;
case 'unsubscribe':
$this->handleUnsubscribe($connection, $message);
break;
case 'ping':
$connection->send(json_encode(['type' => 'pong']));
break;
}
}
public function onClose($connection)
{
echo "监控客户端断开连接: {$connection->id}n";
}
private function handleSubscribe($connection, $message)
{
$metrics = $message['metrics'] ?? [];
$connection->metrics = $metrics;
$connection->send(json_encode([
'type' => 'subscription',
'metrics' => $metrics,
'status' => 'success'
]));
}
private function broadcast($worker, $data)
{
foreach ($worker->connections as $connection) {
if (isset($connection->metrics)) {
$connection->send($data);
}
}
}
public function run()
{
Worker::runAll();
}
}
3.2 监控数据采集器
<?php
namespace appservice;
use thinkfacadeDb;
class MetricsCollector
{
public function collectSystemMetrics()
{
return [
'cpu_usage' => $this->getCpuUsage(),
'memory_usage' => $this->getMemoryUsage(),
'disk_usage' => $this->getDiskUsage(),
'network_io' => $this->getNetworkIO(),
'timestamp' => time()
];
}
public function collectBusinessMetrics()
{
return [
'qps' => $this->calculateQPS(),
'error_rate' => $this->calculateErrorRate(),
'response_time' => $this->getAverageResponseTime(),
'active_users' => $this->getActiveUsers(),
'timestamp' => time()
];
}
public function collectApplicationMetrics()
{
return [
'database_connections' => $this->getDatabaseConnections(),
'cache_hit_rate' => $this->getCacheHitRate(),
'queue_length' => $this->getQueueLength(),
'timestamp' => time()
];
}
private function getCpuUsage()
{
// 获取CPU使用率
$load = sys_getloadavg();
return round($load[0] * 100 / number_format(ini_get('max_execution_time')), 2);
}
private function getMemoryUsage()
{
$memory = memory_get_usage(true);
$total = $this->getTotalMemory();
return round(($memory / $total) * 100, 2);
}
private function calculateQPS()
{
// 统计最近1秒的请求数
$count = Db::name('request_log')
->where('create_time', '>=', time() - 1)
->count();
return $count;
}
}
3.3 数据推送服务
<?php
namespace appjob;
use thinkqueueJob;
class MonitorDataPush
{
public function fire(Job $job, $data)
{
try {
$redis = new Redis();
$redis->connect(config('monitor.redis.host'), config('monitor.redis.port'));
// 发布数据到Redis频道
$redis->publish('monitor_data', json_encode($data));
$job->delete();
} catch (Exception $e) {
// 记录日志
thinkfacadeLog::error('监控数据推送失败: ' . $e->getMessage());
if ($job->attempts() > 3) {
$job->delete();
}
}
}
}
四、监控面板开发
4.1 前端WebSocket客户端
class MonitorClient {
constructor(options = {}) {
this.ws = null;
this.options = Object.assign({
url: 'ws://localhost:2345',
reconnectInterval: 5000,
maxReconnectAttempts: 10
}, options);
this.metrics = new Set();
this.reconnectAttempts = 0;
this.init();
}
init() {
this.connect();
this.setupEventListeners();
}
connect() {
try {
this.ws = new WebSocket(this.options.url);
this.ws.onopen = () => {
console.log('监控连接已建立');
this.reconnectAttempts = 0;
// 重新订阅之前的指标
if (this.metrics.size > 0) {
this.subscribe([...this.metrics]);
}
};
this.ws.onmessage = (event) => {
this.handleMessage(JSON.parse(event.data));
};
this.ws.onclose = () => {
console.log('监控连接已关闭');
this.handleReconnect();
};
this.ws.onerror = (error) => {
console.error('监控连接错误:', error);
};
} catch (error) {
console.error('建立监控连接失败:', error);
}
}
handleMessage(data) {
switch (data.type) {
case 'metrics_update':
this.updateCharts(data.data);
break;
case 'alert':
this.showAlert(data);
break;
case 'pong':
// 心跳响应
break;
}
}
subscribe(metrics) {
metrics.forEach(metric => this.metrics.add(metric));
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'subscribe',
metrics: metrics
}));
}
}
updateCharts(data) {
// 更新ECharts图表
if (window.monitorCharts) {
Object.keys(data).forEach(metricType => {
const chart = window.monitorCharts[metricType];
if (chart) {
this.updateChartData(chart, data[metricType]);
}
});
}
}
updateChartData(chart, newData) {
const option = chart.getOption();
// 更新系列数据
option.series.forEach((series, index) => {
if (newData[series.name]) {
// 保留最近100个数据点
const data = series.data || [];
data.push(newData[series.name]);
if (data.length > 100) {
data.shift();
}
option.series[index].data = data;
}
});
// 更新x轴时间
const times = option.xAxis[0].data || [];
times.push(new Date().toLocaleTimeString());
if (times.length > 100) {
times.shift();
}
option.xAxis[0].data = times;
chart.setOption(option);
}
handleReconnect() {
if (this.reconnectAttempts this.connect(), this.options.reconnectInterval);
}
}
}
// 初始化监控客户端
const monitor = new MonitorClient();
4.2 ECharts图表配置
// CPU使用率图表
function createCpuChart(container) {
const chart = echarts.init(document.getElementById(container));
const option = {
title: { text: 'CPU使用率监控' },
tooltip: { trigger: 'axis' },
legend: { data: ['CPU使用率'] },
xAxis: {
type: 'category',
data: []
},
yAxis: {
type: 'value',
max: 100,
axisLabel: { formatter: '{value}%' }
},
series: [{
name: 'CPU使用率',
type: 'line',
data: [],
smooth: true,
areaStyle: {}
}]
};
chart.setOption(option);
return chart;
}
// 内存使用图表
function createMemoryChart(container) {
const chart = echarts.init(document.getElementById(container));
const option = {
title: { text: '内存使用监控' },
tooltip: { trigger: 'axis' },
legend: { data: ['内存使用率'] },
xAxis: { type: 'category', data: [] },
yAxis: {
type: 'value',
max: 100,
axisLabel: { formatter: '{value}%' }
},
series: [{
name: '内存使用率',
type: 'line',
data: [],
smooth: true,
areaStyle: {}
}]
};
chart.setOption(option);
return chart;
}
五、性能优化策略
5.1 数据采样与聚合
<?php
namespace appservice;
class DataAggregator
{
public function aggregateMetrics($rawData, $interval = 60)
{
$aggregated = [];
foreach ($rawData as $metricType => $dataPoints) {
$aggregated[$metricType] = $this->aggregateDataPoints($dataPoints, $interval);
}
return $aggregated;
}
private function aggregateDataPoints($dataPoints, $interval)
{
if (count($dataPoints) === 0) {
return [];
}
// 按时间窗口聚合
$windowStart = floor($dataPoints[0]['timestamp'] / $interval) * $interval;
$currentWindow = [];
$result = [];
foreach ($dataPoints as $point) {
$pointWindow = floor($point['timestamp'] / $interval) * $interval;
if ($pointWindow !== $windowStart) {
if (!empty($currentWindow)) {
$result[] = [
'timestamp' => $windowStart,
'value' => array_sum($currentWindow) / count($currentWindow),
'count' => count($currentWindow)
];
}
$windowStart = $pointWindow;
$currentWindow = [];
}
$currentWindow[] = $point['value'];
}
return $result;
}
}
5.2 连接管理与心跳检测
<?php
class ConnectionManager
{
protected $connections = [];
public function addConnection($connectionId, $connection)
{
$this->connections[$connectionId] = [
'connection' => $connection,
'last_activity' => time(),
'subscribed_metrics' => []
];
}
public function removeConnection($connectionId)
{
unset($this->connections[$connectionId]);
}
public function updateActivity($connectionId)
{
if (isset($this->connections[$connectionId])) {
$this->connections[$connectionId]['last_activity'] = time();
}
}
public function cleanupInactiveConnections($timeout = 300)
{
$now = time();
foreach ($this->connections as $connectionId => $info) {
if ($now - $info['last_activity'] > $timeout) {
$info['connection']->close();
$this->removeConnection($connectionId);
}
}
}
public function broadcastToSubscribers($metricType, $data)
{
foreach ($this->connections as $info) {
if (in_array($metricType, $info['subscribed_metrics'])) {
$info['connection']->send(json_encode([
'type' => 'metrics_update',
'metric' => $metricType,
'data' => $data
]));
}
}
}
}
项目部署与监控
完成开发后,使用以下命令启动监控系统:
# 启动WebSocket服务
php think monitor:start
# 启动数据采集器
php think collector:start
# 监控服务状态
php think monitor:status
系统特色功能:
- 实时数据推送,延迟低于100毫秒
- 支持万级并发连接
- 灵活的数据聚合策略
- 完整的连接生命周期管理
- 多维度监控指标
- 自动故障恢复机制