ThinkPHP6物联网平台开发 | 设备接入与实时监控系统实战

2025-08-14 0 190

一、平台架构设计

本教程将基于ThinkPHP 6.1构建一个完整的物联网平台,实现设备接入、数据采集和实时监控功能。

技术架构:

  • 核心框架:ThinkPHP 6.1 + Swoole
  • 通信协议:MQTT 3.1.1 + WebSocket
  • 实时数据库:InfluxDB 2.0
  • 消息队列:RabbitMQ 3.9
  • 可视化组件:ECharts 5.3

核心功能模块:

  1. 多协议设备接入
  2. 时序数据存储
  3. 实时数据推送
  4. 告警规则引擎
  5. 可视化仪表盘

二、项目初始化与配置

1. 项目创建与扩展安装

# 创建ThinkPHP6项目
composer create-project topthink/think tp-iot-platform

# 安装必要扩展
cd tp-iot-platform
composer require topthink/think-swoole
composer require simps/mqtt
composer require influxdb/influxdb-php

# 配置数据库连接
// config/database.php
return [
    'connections' => [
        'mysql' => [
            'type'      => 'mysql',
            'hostname'  => '127.0.0.1',
            'database'  => 'iot_platform',
            'username'  => 'root',
            'password'  => '',
            'charset'   => 'utf8mb4',
        ],
        'influxdb' => [
            'type'      => 'influxdb',
            'host'      => 'http://localhost:8086',
            'token'     => 'your-token',
            'org'       => 'iot-org',
            'bucket'    => 'iot-bucket'
        ]
    ]
];

2. 目录结构设计

tp-iot-platform/
├── app/
│   ├── controller/
│   ├── service/      # 服务层
│   │   ├── DeviceService.php
│   │   ├── DataService.php
│   │   └── AlarmService.php
│   ├── model/
│   │   ├── device/   # 设备模型
│   │   └── data/     # 数据模型
│   ├── library/      # 类库
│   │   ├── MqttClient.php
│   │   └── InfluxClient.php
│   ├── task/         # 异步任务
│   └── middleware/   # 中间件
├── config/
│   ├── mqtt.php      # MQTT配置
│   └── influxdb.php  # InfluxDB配置
├── extend/
└── public/

三、设备接入服务

1. MQTT协议集成

// app/library/MqttClient.php
namespace applibrary;

use SimpsMQTTClient;
use thinkfacadeConfig;

class MqttClient
{
    private $client;
    
    public function __construct()
    {
        $config = Config::get('mqtt');
        $this->client = new Client(
            $config['host'], 
            $config['port'], 
            $config['config']
        );
        
        $this->connect();
    }
    
    private function connect()
    {
        try {
            $this->client->connect();
        } catch (Exception $e) {
            throw new Exception("MQTT连接失败: " . $e->getMessage());
        }
    }
    
    // 订阅设备主题
    public function subscribe($topic, $callback)
    {
        $this->client->subscribe([$topic => [
            'qos' => 1,
            'callback' => $callback
        ]]);
    }
    
    // 发布消息到设备
    public function publish($topic, $message)
    {
        return $this->client->publish($topic, $message, 1);
    }
    
    // 处理设备消息
    public function processMessage($topic, $message)
    {
        $deviceId = $this->extractDeviceId($topic);
        $data = json_decode($message, true);
        
        if ($data && $deviceId) {
            // 保存设备数据
            app('DataService')->saveDeviceData($deviceId, $data);
            
            // 检查告警规则
            app('AlarmService')->checkRules($deviceId, $data);
        }
    }
    
    private function extractDeviceId($topic)
    {
        $parts = explode('/', $topic);
        return $parts[1] ?? null;
    }
}

2. 设备管理服务

// app/service/DeviceService.php
namespace appservice;

use thinkfacadeDb;
use appmodeldeviceDevice;

class DeviceService
{
    // 设备注册
    public function register($data)
    {
        $device = Device::where('product_key', $data['product_key'])
            ->where('device_name', $data['device_name'])
            ->find();
            
        if ($device) {
            // 更新设备信息
            $device->save([
                'last_active_time' => time(),
                'status' => 1
            ]);
            return $device;
        }
        
        // 创建设备
        return Device::create([
            'product_key' => $data['product_key'],
            'device_name' => $data['device_name'],
            'device_secret' => $this->generateSecret(),
            'status' => 1,
            'create_time' => time(),
            'last_active_time' => time()
        ]);
    }
    
    // 生成设备密钥
    private function generateSecret()
    {
        return md5(uniqid(microtime(true), true));
    }
    
    // 获取设备接入配置
    public function getConnectConfig($deviceId)
    {
        $device = Device::find($deviceId);
        if (!$device) {
            throw new Exception('设备不存在');
        }
        
        return [
            'host' => config('mqtt.host'),
            'port' => config('mqtt.port'),
            'client_id' => $device->product_key . '|' . $device->device_name,
            'username' => $device->device_name,
            'password' => $device->device_secret,
            'topic' => "device/{$deviceId}/data"
        ];
    }
    
    // 获取设备列表
    public function getDeviceList($params)
    {
        $query = Device::withSearch(['status', 'keyword'], $params);
        return $query->paginate(10);
    }
}

四、数据处理服务

1. 时序数据存储

// app/service/DataService.php
namespace appservice;

use InfluxDB2Client as InfluxClient;
use InfluxDB2ModelWritePrecision;
use thinkfacadeConfig;

class DataService
{
    private $client;
    private $bucket;
    private $org;
    
    public function __construct()
    {
        $config = Config::get('database.connections.influxdb');
        $this->client = new InfluxClient([
            'url' => $config['host'],
            'token' => $config['token'],
        ]);
        $this->bucket = $config['bucket'];
        $this->org = $config['org'];
    }
    
    // 保存设备数据
    public function saveDeviceData($deviceId, $data)
    {
        $writeApi = $this->client->createWriteApi();
        
        $point = [
            'name' => 'device_metrics',
            'tags' => ['device_id' => $deviceId],
            'fields' => $data,
            'time' => microtime(true)
        ];
        
        $writeApi->write($point, 
            WritePrecision::NS, 
            $this->bucket, 
            $this->org
        );
        
        // 同时保存到MySQL做备份
        Db::name('device_data')->insert([
            'device_id' => $deviceId,
            'data' => json_encode($data),
            'create_time' => time()
        ]);
    }
    
    // 查询设备历史数据
    public function queryDeviceData($deviceId, $start, $stop, $fields = [])
    {
        $queryApi = $this->client->createQueryApi();
        
        $query = 'from(bucket: "' . $this->bucket . '")
          |> range(start: ' . $start . ', stop: ' . $stop . ')
          |> filter(fn: (r) => r["_measurement"] == "device_metrics")
          |> filter(fn: (r) => r["device_id"] == "' . $deviceId . '")';
        
        if (!empty($fields)) {
            $fieldFilters = array_map(fn($f) => 'r["_field"] == "' . $f . '"', $fields);
            $query .= '|> filter(fn: (r) => ' . implode(' or ', $fieldFilters) . ')';
        }
        
        return $queryApi->query($query, $this->org);
    }
}

2. 实时数据推送

// app/task/WebSocketServer.php
namespace apptask;

use thinkswooleWebSocket;
use thinkswoolewebsocketRoom;

class WebSocketServer
{
    protected $websocket;
    protected $room;
    
    public function __construct(WebSocket $websocket, Room $room)
    {
        $this->websocket = $websocket;
        $this->room = $room;
    }
    
    // 监听WebSocket连接
    public function onOpen($server, $request)
    {
        $deviceId = $request->get['device_id'] ?? 0;
        if ($deviceId) {
            $this->room->add($request->fd, 'device_' . $deviceId);
        }
    }
    
    // 向设备组推送数据
    public function pushToDevice($deviceId, $data)
    {
        $this->websocket->to('device_' . $deviceId)->emit('data', $data);
    }
    
    // 广播系统消息
    public function broadcast($event, $data)
    {
        $this->websocket->broadcast()->emit($event, $data);
    }
}

// 在MQTT回调中推送数据
$mqttClient->subscribe('device/+/data', function($topic, $message) {
    $deviceId = extractDeviceId($topic);
    $data = json_decode($message, true);
    
    // 推送到WebSocket
    app('WebSocketServer')->pushToDevice($deviceId, [
        'type' => 'realtime',
        'data' => $data,
        'time' => time()
    ]);
});

五、告警规则引擎

1. 规则模型设计

// app/model/device/AlarmRule.php
namespace appmodeldevice;

use thinkModel;

class AlarmRule extends Model
{
    protected $table = 'device_alarm_rule';
    
    // 规则触发条件
    const CONDITION_GT = '>';  // 大于
    const CONDITION_LT = 'belongsTo(Device::class);
    }
    
    // 获取条件运算符
    public static function getConditions()
    {
        return [
            self::CONDITION_GT => '大于',
            self::CONDITION_LT => '小于',
            self::CONDITION_EQ => '等于',
            self::CONDITION_NEQ => '不等于'
        ];
    }
    
    // 检查规则是否触发
    public function check($value)
    {
        switch ($this->condition) {
            case self::CONDITION_GT:
                return $value > $this->threshold;
            case self::CONDITION_LT:
                return $value threshold;
            case self::CONDITION_EQ:
                return $value == $this->threshold;
            case self::CONDITION_NEQ:
                return $value != $this->threshold;
            default:
                return false;
        }
    }
}

2. 告警服务实现

// app/service/AlarmService.php
namespace appservice;

use appmodeldeviceAlarmRule;
use appmodeldeviceAlarmLog;
use apptaskWebSocketServer;

class AlarmService
{
    // 检查设备数据触发规则
    public function checkRules($deviceId, $data)
    {
        $rules = AlarmRule::where('device_id', $deviceId)
            ->where('status', AlarmRule::STATUS_ENABLED)
            ->select();
            
        foreach ($rules as $rule) {
            if (isset($data[$rule->field])) {
                $value = $data[$rule->field];
                if ($rule->check($value)) {
                    $this->triggerAlarm($rule, $value);
                }
            }
        }
    }
    
    // 触发告警
    private function triggerAlarm($rule, $currentValue)
    {
        // 记录告警日志
        $log = AlarmLog::create([
            'rule_id' => $rule->id,
            'device_id' => $rule->device_id,
            'trigger_value' => $currentValue,
            'create_time' => time()
        ]);
        
        // 推送告警通知
        $message = sprintf(
            "设备%s触发告警规则:%s %s %s,当前值:%s",
            $rule->device->device_name,
            $rule->field,
            $rule->condition,
            $rule->threshold,
            $currentValue
        );
        
        // 推送到WebSocket
        app(WebSocketServer::class)->pushToDevice($rule->device_id, [
            'type' => 'alarm',
            'data' => [
                'rule_id' => $rule->id,
                'message' => $message,
                'time' => time()
            ]
        ]);
        
        // 发送邮件或短信通知
        if ($rule->notify_type) {
            $this->sendNotification($rule, $message);
        }
    }
    
    // 发送外部通知
    private function sendNotification($rule, $message)
    {
        // 实现邮件或短信通知逻辑
        // 可以使用异步任务队列处理
    }
}

六、数据可视化

1. 实时监控仪表盘

// app/controller/Dashboard.php
namespace appcontroller;

use thinkfacadeView;
use appserviceDeviceService;
use appserviceDataService;

class Dashboard
{
    public function index($deviceId)
    {
        $device = app(DeviceService::class)->getDevice($deviceId);
        $metrics = app(DataService::class)->getDeviceMetrics($deviceId);
        
        return View::assign([
            'device' => $device,
            'metrics' => $metrics
        ])->fetch();
    }
    
    public function getRealtimeData($deviceId)
    {
        // 获取最近1小时数据
        $data = app(DataService::class)->queryDeviceData(
            $deviceId,
            '-1h',
            'now()',
            ['temperature', 'humidity']
        );
        
        return json([
            'code' => 1,
            'data' => $this->formatChartData($data)
        ]);
    }
    
    private function formatChartData($influxData)
    {
        $result = [];
        foreach ($influxData as $table) {
            foreach ($table->records as $record) {
                $time = $record->getTime();
                $field = $record->getField();
                $value = $record->getValue();
                
                if (!isset($result[$time])) {
                    $result[$time] = ['time' => $time];
                }
                
                $result[$time][$field] = $value;
            }
        }
        
        return array_values($result);
    }
}

2. 前端实时图表

// 使用ECharts绘制实时图表
const chart = echarts.init(document.getElementById('chart'));
const socket = new WebSocket(`ws://${location.host}/ws?device_id=${deviceId}`);

// WebSocket消息处理
socket.addEventListener('message', event => {
    const data = JSON.parse(event.data);
    
    if (data.type === 'realtime') {
        updateRealtimeChart(data.data);
    } else if (data.type === 'alarm') {
        showAlarmNotification(data.data);
    }
});

// 更新实时图表
function updateRealtimeChart(data) {
    const option = chart.getOption();
    const now = new Date();
    
    // 添加新数据点
    option.series[0].data.push({
        name: now.toString(),
        value: [now, data.temperature]
    });
    option.series[1].data.push({
        name: now.toString(),
        value: [now, data.humidity]
    });
    
    // 保留最近100个点
    if (option.series[0].data.length > 100) {
        option.series[0].data.shift();
        option.series[1].data.shift();
    }
    
    chart.setOption(option);
}

// 显示告警通知
function showAlarmNotification(data) {
    const notification = document.createElement('div');
    notification.className = 'alarm-notification';
    notification.innerHTML = `
        

设备告警

${data.message}

${new Date(data.time * 1000).toLocaleString()} `; document.body.appendChild(notification); setTimeout(() => { notification.remove(); }, 5000); }

七、系统部署方案

1. Docker生产环境部署

# docker-compose.yml
version: '3.8'

services:
  app:
    build:
      context: .
      dockerfile: Dockerfile
    image: tp-iot-platform
    container_name: iot-app
    restart: unless-stopped
    ports:
      - "9501:9501"
      - "9502:9502" # WebSocket端口
    depends_on:
      - influxdb
      - rabbitmq
      - redis
  
  influxdb:
    image: influxdb:2.0
    container_name: iot-influxdb
    ports:
      - "8086:8086"
    volumes:
      - influx_data:/var/lib/influxdb2
    environment:
      DOCKER_INFLUXDB_INIT_MODE: setup
      DOCKER_INFLUXDB_INIT_USERNAME: admin
      DOCKER_INFLUXDB_INIT_PASSWORD: admin123
      DOCKER_INFLUXDB_INIT_ORG: iot-org
      DOCKER_INFLUXDB_INIT_BUCKET: iot-bucket
      DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: your-token
  
  rabbitmq:
    image: rabbitmq:3.9-management
    container_name: iot-rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - rabbit_data:/var/lib/rabbitmq

volumes:
  influx_data:
  rabbit_data:

八、总结与扩展

本教程构建了一个完整的物联网平台:

  1. 实现了多协议设备接入
  2. 开发了时序数据存储
  3. 构建了实时监控系统
  4. 设计了告警规则引擎
  5. 实现了数据可视化

扩展方向:

  • 边缘计算集成
  • AI异常检测
  • 设备固件OTA升级
  • 多租户支持

完整项目代码已开源:https://github.com/example/tp-iot-platform

ThinkPHP6物联网平台开发 | 设备接入与实时监控系统实战
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 thinkphp ThinkPHP6物联网平台开发 | 设备接入与实时监控系统实战 https://www.taomawang.com/server/thinkphp/827.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务