一、平台架构设计
本教程将基于ThinkPHP 6.1构建一个完整的物联网平台,实现设备接入、数据采集和实时监控功能。
技术架构:
- 核心框架:ThinkPHP 6.1 + Swoole
- 通信协议:MQTT 3.1.1 + WebSocket
- 实时数据库:InfluxDB 2.0
- 消息队列:RabbitMQ 3.9
- 可视化组件:ECharts 5.3
核心功能模块:
- 多协议设备接入
- 时序数据存储
- 实时数据推送
- 告警规则引擎
- 可视化仪表盘
二、项目初始化与配置
1. 项目创建与扩展安装
# 创建ThinkPHP6项目
composer create-project topthink/think tp-iot-platform
# 安装必要扩展
cd tp-iot-platform
composer require topthink/think-swoole
composer require simps/mqtt
composer require influxdb/influxdb-php
# 配置数据库连接
// config/database.php
return [
'connections' => [
'mysql' => [
'type' => 'mysql',
'hostname' => '127.0.0.1',
'database' => 'iot_platform',
'username' => 'root',
'password' => '',
'charset' => 'utf8mb4',
],
'influxdb' => [
'type' => 'influxdb',
'host' => 'http://localhost:8086',
'token' => 'your-token',
'org' => 'iot-org',
'bucket' => 'iot-bucket'
]
]
];
2. 目录结构设计
tp-iot-platform/
├── app/
│ ├── controller/
│ ├── service/ # 服务层
│ │ ├── DeviceService.php
│ │ ├── DataService.php
│ │ └── AlarmService.php
│ ├── model/
│ │ ├── device/ # 设备模型
│ │ └── data/ # 数据模型
│ ├── library/ # 类库
│ │ ├── MqttClient.php
│ │ └── InfluxClient.php
│ ├── task/ # 异步任务
│ └── middleware/ # 中间件
├── config/
│ ├── mqtt.php # MQTT配置
│ └── influxdb.php # InfluxDB配置
├── extend/
└── public/
三、设备接入服务
1. MQTT协议集成
// app/library/MqttClient.php
namespace applibrary;
use SimpsMQTTClient;
use thinkfacadeConfig;
class MqttClient
{
private $client;
public function __construct()
{
$config = Config::get('mqtt');
$this->client = new Client(
$config['host'],
$config['port'],
$config['config']
);
$this->connect();
}
private function connect()
{
try {
$this->client->connect();
} catch (Exception $e) {
throw new Exception("MQTT连接失败: " . $e->getMessage());
}
}
// 订阅设备主题
public function subscribe($topic, $callback)
{
$this->client->subscribe([$topic => [
'qos' => 1,
'callback' => $callback
]]);
}
// 发布消息到设备
public function publish($topic, $message)
{
return $this->client->publish($topic, $message, 1);
}
// 处理设备消息
public function processMessage($topic, $message)
{
$deviceId = $this->extractDeviceId($topic);
$data = json_decode($message, true);
if ($data && $deviceId) {
// 保存设备数据
app('DataService')->saveDeviceData($deviceId, $data);
// 检查告警规则
app('AlarmService')->checkRules($deviceId, $data);
}
}
private function extractDeviceId($topic)
{
$parts = explode('/', $topic);
return $parts[1] ?? null;
}
}
2. 设备管理服务
// app/service/DeviceService.php
namespace appservice;
use thinkfacadeDb;
use appmodeldeviceDevice;
class DeviceService
{
// 设备注册
public function register($data)
{
$device = Device::where('product_key', $data['product_key'])
->where('device_name', $data['device_name'])
->find();
if ($device) {
// 更新设备信息
$device->save([
'last_active_time' => time(),
'status' => 1
]);
return $device;
}
// 创建设备
return Device::create([
'product_key' => $data['product_key'],
'device_name' => $data['device_name'],
'device_secret' => $this->generateSecret(),
'status' => 1,
'create_time' => time(),
'last_active_time' => time()
]);
}
// 生成设备密钥
private function generateSecret()
{
return md5(uniqid(microtime(true), true));
}
// 获取设备接入配置
public function getConnectConfig($deviceId)
{
$device = Device::find($deviceId);
if (!$device) {
throw new Exception('设备不存在');
}
return [
'host' => config('mqtt.host'),
'port' => config('mqtt.port'),
'client_id' => $device->product_key . '|' . $device->device_name,
'username' => $device->device_name,
'password' => $device->device_secret,
'topic' => "device/{$deviceId}/data"
];
}
// 获取设备列表
public function getDeviceList($params)
{
$query = Device::withSearch(['status', 'keyword'], $params);
return $query->paginate(10);
}
}
四、数据处理服务
1. 时序数据存储
// app/service/DataService.php
namespace appservice;
use InfluxDB2Client as InfluxClient;
use InfluxDB2ModelWritePrecision;
use thinkfacadeConfig;
class DataService
{
private $client;
private $bucket;
private $org;
public function __construct()
{
$config = Config::get('database.connections.influxdb');
$this->client = new InfluxClient([
'url' => $config['host'],
'token' => $config['token'],
]);
$this->bucket = $config['bucket'];
$this->org = $config['org'];
}
// 保存设备数据
public function saveDeviceData($deviceId, $data)
{
$writeApi = $this->client->createWriteApi();
$point = [
'name' => 'device_metrics',
'tags' => ['device_id' => $deviceId],
'fields' => $data,
'time' => microtime(true)
];
$writeApi->write($point,
WritePrecision::NS,
$this->bucket,
$this->org
);
// 同时保存到MySQL做备份
Db::name('device_data')->insert([
'device_id' => $deviceId,
'data' => json_encode($data),
'create_time' => time()
]);
}
// 查询设备历史数据
public function queryDeviceData($deviceId, $start, $stop, $fields = [])
{
$queryApi = $this->client->createQueryApi();
$query = 'from(bucket: "' . $this->bucket . '")
|> range(start: ' . $start . ', stop: ' . $stop . ')
|> filter(fn: (r) => r["_measurement"] == "device_metrics")
|> filter(fn: (r) => r["device_id"] == "' . $deviceId . '")';
if (!empty($fields)) {
$fieldFilters = array_map(fn($f) => 'r["_field"] == "' . $f . '"', $fields);
$query .= '|> filter(fn: (r) => ' . implode(' or ', $fieldFilters) . ')';
}
return $queryApi->query($query, $this->org);
}
}
2. 实时数据推送
// app/task/WebSocketServer.php
namespace apptask;
use thinkswooleWebSocket;
use thinkswoolewebsocketRoom;
class WebSocketServer
{
protected $websocket;
protected $room;
public function __construct(WebSocket $websocket, Room $room)
{
$this->websocket = $websocket;
$this->room = $room;
}
// 监听WebSocket连接
public function onOpen($server, $request)
{
$deviceId = $request->get['device_id'] ?? 0;
if ($deviceId) {
$this->room->add($request->fd, 'device_' . $deviceId);
}
}
// 向设备组推送数据
public function pushToDevice($deviceId, $data)
{
$this->websocket->to('device_' . $deviceId)->emit('data', $data);
}
// 广播系统消息
public function broadcast($event, $data)
{
$this->websocket->broadcast()->emit($event, $data);
}
}
// 在MQTT回调中推送数据
$mqttClient->subscribe('device/+/data', function($topic, $message) {
$deviceId = extractDeviceId($topic);
$data = json_decode($message, true);
// 推送到WebSocket
app('WebSocketServer')->pushToDevice($deviceId, [
'type' => 'realtime',
'data' => $data,
'time' => time()
]);
});
五、告警规则引擎
1. 规则模型设计
// app/model/device/AlarmRule.php
namespace appmodeldevice;
use thinkModel;
class AlarmRule extends Model
{
protected $table = 'device_alarm_rule';
// 规则触发条件
const CONDITION_GT = '>'; // 大于
const CONDITION_LT = 'belongsTo(Device::class);
}
// 获取条件运算符
public static function getConditions()
{
return [
self::CONDITION_GT => '大于',
self::CONDITION_LT => '小于',
self::CONDITION_EQ => '等于',
self::CONDITION_NEQ => '不等于'
];
}
// 检查规则是否触发
public function check($value)
{
switch ($this->condition) {
case self::CONDITION_GT:
return $value > $this->threshold;
case self::CONDITION_LT:
return $value threshold;
case self::CONDITION_EQ:
return $value == $this->threshold;
case self::CONDITION_NEQ:
return $value != $this->threshold;
default:
return false;
}
}
}
2. 告警服务实现
// app/service/AlarmService.php
namespace appservice;
use appmodeldeviceAlarmRule;
use appmodeldeviceAlarmLog;
use apptaskWebSocketServer;
class AlarmService
{
// 检查设备数据触发规则
public function checkRules($deviceId, $data)
{
$rules = AlarmRule::where('device_id', $deviceId)
->where('status', AlarmRule::STATUS_ENABLED)
->select();
foreach ($rules as $rule) {
if (isset($data[$rule->field])) {
$value = $data[$rule->field];
if ($rule->check($value)) {
$this->triggerAlarm($rule, $value);
}
}
}
}
// 触发告警
private function triggerAlarm($rule, $currentValue)
{
// 记录告警日志
$log = AlarmLog::create([
'rule_id' => $rule->id,
'device_id' => $rule->device_id,
'trigger_value' => $currentValue,
'create_time' => time()
]);
// 推送告警通知
$message = sprintf(
"设备%s触发告警规则:%s %s %s,当前值:%s",
$rule->device->device_name,
$rule->field,
$rule->condition,
$rule->threshold,
$currentValue
);
// 推送到WebSocket
app(WebSocketServer::class)->pushToDevice($rule->device_id, [
'type' => 'alarm',
'data' => [
'rule_id' => $rule->id,
'message' => $message,
'time' => time()
]
]);
// 发送邮件或短信通知
if ($rule->notify_type) {
$this->sendNotification($rule, $message);
}
}
// 发送外部通知
private function sendNotification($rule, $message)
{
// 实现邮件或短信通知逻辑
// 可以使用异步任务队列处理
}
}
六、数据可视化
1. 实时监控仪表盘
// app/controller/Dashboard.php
namespace appcontroller;
use thinkfacadeView;
use appserviceDeviceService;
use appserviceDataService;
class Dashboard
{
public function index($deviceId)
{
$device = app(DeviceService::class)->getDevice($deviceId);
$metrics = app(DataService::class)->getDeviceMetrics($deviceId);
return View::assign([
'device' => $device,
'metrics' => $metrics
])->fetch();
}
public function getRealtimeData($deviceId)
{
// 获取最近1小时数据
$data = app(DataService::class)->queryDeviceData(
$deviceId,
'-1h',
'now()',
['temperature', 'humidity']
);
return json([
'code' => 1,
'data' => $this->formatChartData($data)
]);
}
private function formatChartData($influxData)
{
$result = [];
foreach ($influxData as $table) {
foreach ($table->records as $record) {
$time = $record->getTime();
$field = $record->getField();
$value = $record->getValue();
if (!isset($result[$time])) {
$result[$time] = ['time' => $time];
}
$result[$time][$field] = $value;
}
}
return array_values($result);
}
}
2. 前端实时图表
// 使用ECharts绘制实时图表
const chart = echarts.init(document.getElementById('chart'));
const socket = new WebSocket(`ws://${location.host}/ws?device_id=${deviceId}`);
// WebSocket消息处理
socket.addEventListener('message', event => {
const data = JSON.parse(event.data);
if (data.type === 'realtime') {
updateRealtimeChart(data.data);
} else if (data.type === 'alarm') {
showAlarmNotification(data.data);
}
});
// 更新实时图表
function updateRealtimeChart(data) {
const option = chart.getOption();
const now = new Date();
// 添加新数据点
option.series[0].data.push({
name: now.toString(),
value: [now, data.temperature]
});
option.series[1].data.push({
name: now.toString(),
value: [now, data.humidity]
});
// 保留最近100个点
if (option.series[0].data.length > 100) {
option.series[0].data.shift();
option.series[1].data.shift();
}
chart.setOption(option);
}
// 显示告警通知
function showAlarmNotification(data) {
const notification = document.createElement('div');
notification.className = 'alarm-notification';
notification.innerHTML = `
设备告警
${data.message}
${new Date(data.time * 1000).toLocaleString()}
`;
document.body.appendChild(notification);
setTimeout(() => {
notification.remove();
}, 5000);
}
七、系统部署方案
1. Docker生产环境部署
# docker-compose.yml
version: '3.8'
services:
app:
build:
context: .
dockerfile: Dockerfile
image: tp-iot-platform
container_name: iot-app
restart: unless-stopped
ports:
- "9501:9501"
- "9502:9502" # WebSocket端口
depends_on:
- influxdb
- rabbitmq
- redis
influxdb:
image: influxdb:2.0
container_name: iot-influxdb
ports:
- "8086:8086"
volumes:
- influx_data:/var/lib/influxdb2
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: admin123
DOCKER_INFLUXDB_INIT_ORG: iot-org
DOCKER_INFLUXDB_INIT_BUCKET: iot-bucket
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: your-token
rabbitmq:
image: rabbitmq:3.9-management
container_name: iot-rabbitmq
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbit_data:/var/lib/rabbitmq
volumes:
influx_data:
rabbit_data:
八、总结与扩展
本教程构建了一个完整的物联网平台:
- 实现了多协议设备接入
- 开发了时序数据存储
- 构建了实时监控系统
- 设计了告警规则引擎
- 实现了数据可视化
扩展方向:
- 边缘计算集成
- AI异常检测
- 设备固件OTA升级
- 多租户支持