ThinkPHP 6.x实时数据可视化监控系统开发指南 | 高性能监控解决方案

2025-10-11 0 764

构建基于WebSocket的高性能业务监控平台

一、系统架构设计

实时监控系统需要处理高并发连接和海量数据推送,我们采用分层架构确保系统稳定性和扩展性。

1.1 技术架构图

数据采集层 → 消息队列 → WebSocket服务层 → 前端可视化层
    ↓              ↓             ↓             ↓
业务系统日志    Redis队列     Workerman      ECharts图表
系统性能指标    RabbitMQ      GatewayWorker  实时数据更新
应用监控数据               连接管理/消息分发
            

1.2 核心组件说明

  • 数据采集器:负责收集各类监控指标
  • 消息中间件:缓冲数据,削峰填谷
  • WebSocket服务:维持长连接,实时推送
  • 数据存储器:历史数据存储与分析
  • 可视化面板:多维度数据展示

二、环境配置与依赖

2.1 扩展安装


# 安装Workerman扩展
composer require workerman/workerman

# 安装Redis扩展
composer require predis/predis

# 安装ECharts前端依赖
npm install echarts vue-echarts
            

2.2 配置文件

创建config/monitor.php配置文件:


<?php
return [
    // WebSocket配置
    'websocket' => [
        'host' => '0.0.0.0',
        'port' => 2345,
        'process_count' => 4
    ],
    
    // Redis配置
    'redis' => [
        'host' => '127.0.0.1',
        'port' => 6379,
        'database' => 1,
        'channel' => 'monitor_data'
    ],
    
    // 监控指标配置
    'metrics' => [
        'system' => ['cpu', 'memory', 'disk', 'network'],
        'business' => ['qps', 'error_rate', 'response_time'],
        'application' => ['database', 'cache', 'queue']
    ]
];
            

三、WebSocket服务实现

3.1 WebSocket服务器


<?php
namespace appwebsocket;

use WorkermanWorker;
use WorkermanRedisClient;

class MonitorServer
{
    protected $worker;
    protected $redis;
    
    public function __construct()
    {
        $config = config('monitor.websocket');
        
        $this->worker = new Worker("websocket://{$config['host']}:{$config['port']}");
        $this->worker->count = $config['process_count'];
        
        $this->initialize();
    }
    
    private function initialize()
    {
        $this->worker->onWorkerStart = [$this, 'onWorkerStart'];
        $this->worker->onConnect = [$this, 'onConnect'];
        $this->worker->onMessage = [$this, 'onMessage'];
        $this->worker->onClose = [$this, 'onClose'];
    }
    
    public function onWorkerStart($worker)
    {
        // 连接Redis并订阅监控频道
        $this->redis = new Client(config('monitor.redis'));
        
        // 订阅数据更新频道
        $this->redis->subscribe(['monitor_data'], function ($channel, $data) use ($worker) {
            $this->broadcast($worker, $data);
        });
    }
    
    public function onConnect($connection)
    {
        echo "新的监控客户端连接: {$connection->id}n";
        
        // 发送连接成功消息
        $connection->send(json_encode([
            'type' => 'connection',
            'status' => 'success',
            'timestamp' => time()
        ]));
    }
    
    public function onMessage($connection, $data)
    {
        $message = json_decode($data, true);
        
        switch ($message['type'] ?? '') {
            case 'subscribe':
                $this->handleSubscribe($connection, $message);
                break;
            case 'unsubscribe':
                $this->handleUnsubscribe($connection, $message);
                break;
            case 'ping':
                $connection->send(json_encode(['type' => 'pong']));
                break;
        }
    }
    
    public function onClose($connection)
    {
        echo "监控客户端断开连接: {$connection->id}n";
    }
    
    private function handleSubscribe($connection, $message)
    {
        $metrics = $message['metrics'] ?? [];
        $connection->metrics = $metrics;
        
        $connection->send(json_encode([
            'type' => 'subscription',
            'metrics' => $metrics,
            'status' => 'success'
        ]));
    }
    
    private function broadcast($worker, $data)
    {
        foreach ($worker->connections as $connection) {
            if (isset($connection->metrics)) {
                $connection->send($data);
            }
        }
    }
    
    public function run()
    {
        Worker::runAll();
    }
}
            

3.2 监控数据采集器


<?php
namespace appservice;

use thinkfacadeDb;

class MetricsCollector
{
    public function collectSystemMetrics()
    {
        return [
            'cpu_usage' => $this->getCpuUsage(),
            'memory_usage' => $this->getMemoryUsage(),
            'disk_usage' => $this->getDiskUsage(),
            'network_io' => $this->getNetworkIO(),
            'timestamp' => time()
        ];
    }
    
    public function collectBusinessMetrics()
    {
        return [
            'qps' => $this->calculateQPS(),
            'error_rate' => $this->calculateErrorRate(),
            'response_time' => $this->getAverageResponseTime(),
            'active_users' => $this->getActiveUsers(),
            'timestamp' => time()
        ];
    }
    
    public function collectApplicationMetrics()
    {
        return [
            'database_connections' => $this->getDatabaseConnections(),
            'cache_hit_rate' => $this->getCacheHitRate(),
            'queue_length' => $this->getQueueLength(),
            'timestamp' => time()
        ];
    }
    
    private function getCpuUsage()
    {
        // 获取CPU使用率
        $load = sys_getloadavg();
        return round($load[0] * 100 / number_format(ini_get('max_execution_time')), 2);
    }
    
    private function getMemoryUsage()
    {
        $memory = memory_get_usage(true);
        $total = $this->getTotalMemory();
        return round(($memory / $total) * 100, 2);
    }
    
    private function calculateQPS()
    {
        // 统计最近1秒的请求数
        $count = Db::name('request_log')
            ->where('create_time', '>=', time() - 1)
            ->count();
            
        return $count;
    }
}
            

3.3 数据推送服务


<?php
namespace appjob;

use thinkqueueJob;

class MonitorDataPush
{
    public function fire(Job $job, $data)
    {
        try {
            $redis = new Redis();
            $redis->connect(config('monitor.redis.host'), config('monitor.redis.port'));
            
            // 发布数据到Redis频道
            $redis->publish('monitor_data', json_encode($data));
            
            $job->delete();
            
        } catch (Exception $e) {
            // 记录日志
            thinkfacadeLog::error('监控数据推送失败: ' . $e->getMessage());
            
            if ($job->attempts() > 3) {
                $job->delete();
            }
        }
    }
}
            

四、监控面板开发

4.1 前端WebSocket客户端


class MonitorClient {
    constructor(options = {}) {
        this.ws = null;
        this.options = Object.assign({
            url: 'ws://localhost:2345',
            reconnectInterval: 5000,
            maxReconnectAttempts: 10
        }, options);
        
        this.metrics = new Set();
        this.reconnectAttempts = 0;
        this.init();
    }
    
    init() {
        this.connect();
        this.setupEventListeners();
    }
    
    connect() {
        try {
            this.ws = new WebSocket(this.options.url);
            
            this.ws.onopen = () => {
                console.log('监控连接已建立');
                this.reconnectAttempts = 0;
                
                // 重新订阅之前的指标
                if (this.metrics.size > 0) {
                    this.subscribe([...this.metrics]);
                }
            };
            
            this.ws.onmessage = (event) => {
                this.handleMessage(JSON.parse(event.data));
            };
            
            this.ws.onclose = () => {
                console.log('监控连接已关闭');
                this.handleReconnect();
            };
            
            this.ws.onerror = (error) => {
                console.error('监控连接错误:', error);
            };
            
        } catch (error) {
            console.error('建立监控连接失败:', error);
        }
    }
    
    handleMessage(data) {
        switch (data.type) {
            case 'metrics_update':
                this.updateCharts(data.data);
                break;
            case 'alert':
                this.showAlert(data);
                break;
            case 'pong':
                // 心跳响应
                break;
        }
    }
    
    subscribe(metrics) {
        metrics.forEach(metric => this.metrics.add(metric));
        
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify({
                type: 'subscribe',
                metrics: metrics
            }));
        }
    }
    
    updateCharts(data) {
        // 更新ECharts图表
        if (window.monitorCharts) {
            Object.keys(data).forEach(metricType => {
                const chart = window.monitorCharts[metricType];
                if (chart) {
                    this.updateChartData(chart, data[metricType]);
                }
            });
        }
    }
    
    updateChartData(chart, newData) {
        const option = chart.getOption();
        
        // 更新系列数据
        option.series.forEach((series, index) => {
            if (newData[series.name]) {
                // 保留最近100个数据点
                const data = series.data || [];
                data.push(newData[series.name]);
                if (data.length > 100) {
                    data.shift();
                }
                option.series[index].data = data;
            }
        });
        
        // 更新x轴时间
        const times = option.xAxis[0].data || [];
        times.push(new Date().toLocaleTimeString());
        if (times.length > 100) {
            times.shift();
        }
        option.xAxis[0].data = times;
        
        chart.setOption(option);
    }
    
    handleReconnect() {
        if (this.reconnectAttempts  this.connect(), this.options.reconnectInterval);
        }
    }
}

// 初始化监控客户端
const monitor = new MonitorClient();
            

4.2 ECharts图表配置


// CPU使用率图表
function createCpuChart(container) {
    const chart = echarts.init(document.getElementById(container));
    
    const option = {
        title: { text: 'CPU使用率监控' },
        tooltip: { trigger: 'axis' },
        legend: { data: ['CPU使用率'] },
        xAxis: {
            type: 'category',
            data: []
        },
        yAxis: {
            type: 'value',
            max: 100,
            axisLabel: { formatter: '{value}%' }
        },
        series: [{
            name: 'CPU使用率',
            type: 'line',
            data: [],
            smooth: true,
            areaStyle: {}
        }]
    };
    
    chart.setOption(option);
    return chart;
}

// 内存使用图表
function createMemoryChart(container) {
    const chart = echarts.init(document.getElementById(container));
    
    const option = {
        title: { text: '内存使用监控' },
        tooltip: { trigger: 'axis' },
        legend: { data: ['内存使用率'] },
        xAxis: { type: 'category', data: [] },
        yAxis: {
            type: 'value',
            max: 100,
            axisLabel: { formatter: '{value}%' }
        },
        series: [{
            name: '内存使用率',
            type: 'line',
            data: [],
            smooth: true,
            areaStyle: {}
        }]
    };
    
    chart.setOption(option);
    return chart;
}
            

五、性能优化策略

5.1 数据采样与聚合


<?php
namespace appservice;

class DataAggregator
{
    public function aggregateMetrics($rawData, $interval = 60)
    {
        $aggregated = [];
        
        foreach ($rawData as $metricType => $dataPoints) {
            $aggregated[$metricType] = $this->aggregateDataPoints($dataPoints, $interval);
        }
        
        return $aggregated;
    }
    
    private function aggregateDataPoints($dataPoints, $interval)
    {
        if (count($dataPoints) === 0) {
            return [];
        }
        
        // 按时间窗口聚合
        $windowStart = floor($dataPoints[0]['timestamp'] / $interval) * $interval;
        $currentWindow = [];
        $result = [];
        
        foreach ($dataPoints as $point) {
            $pointWindow = floor($point['timestamp'] / $interval) * $interval;
            
            if ($pointWindow !== $windowStart) {
                if (!empty($currentWindow)) {
                    $result[] = [
                        'timestamp' => $windowStart,
                        'value' => array_sum($currentWindow) / count($currentWindow),
                        'count' => count($currentWindow)
                    ];
                }
                
                $windowStart = $pointWindow;
                $currentWindow = [];
            }
            
            $currentWindow[] = $point['value'];
        }
        
        return $result;
    }
}
            

5.2 连接管理与心跳检测


<?php
class ConnectionManager
{
    protected $connections = [];
    
    public function addConnection($connectionId, $connection)
    {
        $this->connections[$connectionId] = [
            'connection' => $connection,
            'last_activity' => time(),
            'subscribed_metrics' => []
        ];
    }
    
    public function removeConnection($connectionId)
    {
        unset($this->connections[$connectionId]);
    }
    
    public function updateActivity($connectionId)
    {
        if (isset($this->connections[$connectionId])) {
            $this->connections[$connectionId]['last_activity'] = time();
        }
    }
    
    public function cleanupInactiveConnections($timeout = 300)
    {
        $now = time();
        
        foreach ($this->connections as $connectionId => $info) {
            if ($now - $info['last_activity'] > $timeout) {
                $info['connection']->close();
                $this->removeConnection($connectionId);
            }
        }
    }
    
    public function broadcastToSubscribers($metricType, $data)
    {
        foreach ($this->connections as $info) {
            if (in_array($metricType, $info['subscribed_metrics'])) {
                $info['connection']->send(json_encode([
                    'type' => 'metrics_update',
                    'metric' => $metricType,
                    'data' => $data
                ]));
            }
        }
    }
}
            

项目部署与监控

完成开发后,使用以下命令启动监控系统:


# 启动WebSocket服务
php think monitor:start

# 启动数据采集器
php think collector:start

# 监控服务状态
php think monitor:status
            

系统特色功能:

  • 实时数据推送,延迟低于100毫秒
  • 支持万级并发连接
  • 灵活的数据聚合策略
  • 完整的连接生命周期管理
  • 多维度监控指标
  • 自动故障恢复机制

ThinkPHP 6.x实时数据可视化监控系统开发指南 | 高性能监控解决方案
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 thinkphp ThinkPHP 6.x实时数据可视化监控系统开发指南 | 高性能监控解决方案 https://www.taomawang.com/server/thinkphp/1198.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务