基于MQTT协议构建企业级设备管理系统的完整方案
一、物联网平台技术选型
现代物联网平台核心组件对比:
组件类型 | 常用方案 | PHP适配性 |
---|---|---|
通信协议 | MQTT/CoAP/HTTP | MQTT最佳支持 |
消息代理 | Mosquitto/EMQX | Mosquitto最稳定 |
设备认证 | JWT/X.509证书 | JWT实现简单 |
数据存储 | 时序数据库/MySQL | MySQL+Redis缓存 |
二、系统架构设计
1. 分层架构设计
设备层 → 协议接入层 → 业务处理层 → 数据存储层 → 应用层 ↑ ↑ ↑ ↑ ↑ 传感器 MQTT代理 设备管理服务 时序数据库 Web管理端
2. 数据流设计
设备上报 → MQTT主题 → 消息处理 → 数据持久化 → 可视化展示
↑ ↑ ↑ ↑
二进制协议 QoS等级控制 规则引擎处理 实时WebSocket推送
三、核心模块实现
1. MQTT服务集成
<?php
class MqttService
{
private $server;
private $loop;
private $client;
public function __construct($config)
{
$this->loop = ReactEventLoopFactory::create();
$this->server = new MosquittoClient();
$this->server->onConnect(function($code, $message) {
echo "MQTT服务已连接n";
});
$this->server->onMessage(function($message) {
$this->handleIncomingMessage(
$message->topic,
$message->payload
);
});
$this->connect($config);
}
private function connect($config)
{
$this->server->setCredentials(
$config['username'],
$config['password']
);
$this->server->connect(
$config['host'],
$config['port'],
60
);
// 订阅设备主题
$this->server->subscribe('device/+/data', 1);
$this->server->subscribe('device/+/status', 1);
}
public function handleIncomingMessage($topic, $payload)
{
$parts = explode('/', $topic);
$deviceId = $parts[1];
$messageType = $parts[2];
$processor = new DeviceMessageProcessor();
switch ($messageType) {
case 'data':
$processor->processDeviceData($deviceId, $payload);
break;
case 'status':
$processor->processDeviceStatus($deviceId, $payload);
break;
}
}
public function publish($topic, $message, $qos = 1)
{
$this->server->publish($topic, $message, $qos);
}
public function run()
{
$this->loop->addPeriodicTimer(0.01, function() {
$this->server->loop();
});
$this->loop->run();
}
}
// 启动服务
$config = [
'host' => 'mqtt.example.com',
'port' => 1883,
'username' => 'php-service',
'password' => 'securepassword'
];
$mqttService = new MqttService($config);
$mqttService->run();
2. 设备消息处理器
<?php
class DeviceMessageProcessor
{
private $db;
private $cache;
public function __construct()
{
$this->db = new PDO('mysql:host=localhost;dbname=iot', 'user', 'pass');
$this->cache = new Redis();
$this->cache->connect('127.0.0.1', 6379);
}
public function processDeviceData($deviceId, $payload)
{
$data = json_decode($payload, true);
// 验证设备
if (!$this->validateDevice($deviceId)) {
return false;
}
// 数据校验
$validated = $this->validateData($deviceId, $data);
if (!$validated) {
return false;
}
// 存储到数据库
$stmt = $this->db->prepare(
"INSERT INTO device_data
(device_id, temperature, humidity, voltage, created_at)
VALUES (?, ?, ?, ?, NOW())"
);
$stmt->execute([
$deviceId,
$data['temp'] ?? null,
$data['humi'] ?? null,
$data['volt'] ?? null
]);
// 更新设备缓存状态
$this->cache->hMSet("device:{$deviceId}:latest", [
'temperature' => $data['temp'],
'updated_at' => time()
]);
// 触发规则引擎
$this->triggerRules($deviceId, $data);
return true;
}
private function validateDevice($deviceId)
{
// 检查缓存
if ($this->cache->exists("device:{$deviceId}:active")) {
return true;
}
// 查询数据库
$stmt = $this->db->prepare(
"SELECT id FROM devices WHERE id = ? AND status = 'active'"
);
$stmt->execute([$deviceId]);
if ($stmt->fetch()) {
$this->cache->set("device:{$deviceId}:active", 1, 3600);
return true;
}
return false;
}
}
四、高级功能实现
1. 设备影子服务
<?php
class DeviceShadowService
{
private $db;
private $mqtt;
public function __construct($mqttService)
{
$this->db = new PDO('mysql:host=localhost;dbname=iot', 'user', 'pass');
$this->mqtt = $mqttService;
}
public function updateShadow($deviceId, $desiredState)
{
// 获取当前设备状态
$reported = $this->getReportedState($deviceId);
// 生成差异文档
$delta = $this->calculateDelta($reported, $desiredState);
if (!empty($delta)) {
// 保存期望状态
$this->saveDesiredState($deviceId, $desiredState);
// 发布配置更新
$this->mqtt->publish(
"device/{$deviceId}/config",
json_encode($delta)
);
// 记录操作日志
$this->logShadowUpdate($deviceId, $delta);
}
return $delta;
}
private function calculateDelta($reported, $desired)
{
$delta = [];
foreach ($desired as $key => $value) {
if (!isset($reported[$key]) || $reported[$key] != $value) {
$delta[$key] = $value;
}
}
return $delta;
}
public function handleStatusUpdate($deviceId, $reportedState)
{
// 获取期望状态
$desired = $this->getDesiredState($deviceId);
// 检查是否同步完成
$synced = true;
foreach ($desired as $key => $value) {
if (isset($reportedState[$key]) && $reportedState[$key] != $value) {
$synced = false;
break;
}
}
// 更新影子文档
$this->saveShadowDocument($deviceId, [
'reported' => $reportedState,
'desired' => $desired,
'synced' => $synced,
'updated_at' => time()
]);
return $synced;
}
}
2. 规则引擎实现
<?php
class RuleEngine
{
private $rules = [];
private $actions = [
'alert' => 'sendAlert',
'command' => 'sendCommand',
'webhook' => 'triggerWebhook'
];
public function loadRules($deviceType)
{
$json = file_get_contents(__DIR__ . "/rules/{$deviceType}.json");
$this->rules = json_decode($json, true);
}
public function evaluate($deviceId, $data)
{
$triggered = [];
foreach ($this->rules as $rule) {
if ($this->checkConditions($rule['conditions'], $data)) {
$triggered[] = $rule;
// 执行动作
foreach ($rule['actions'] as $action) {
$this->executeAction($action, $deviceId, $data);
}
}
}
return $triggered;
}
private function checkConditions($conditions, $data)
{
foreach ($conditions as $condition) {
$field = $condition['field'];
$operator = $condition['operator'];
$value = $condition['value'];
$actual = $data[$field] ?? null;
if (!$this->compare($actual, $operator, $value)) {
return false;
}
}
return true;
}
private function compare($actual, $operator, $expected)
{
switch ($operator) {
case '>': return $actual > $expected;
case '<': return $actual < $expected;
case '=': return $actual == $expected;
case 'changed':
$last = $this->getLastValue($field);
return $last !== null && $actual != $last;
default: return false;
}
}
private function executeAction($action, $deviceId, $data)
{
$method = $this->actions[$action['type']];
if (method_exists($this, $method)) {
$this->$method($deviceId, $action, $data);
}
}
private function sendAlert($deviceId, $action, $data)
{
$message = str_replace(
array_map(function($k) { return '{'.$k.'}'; }, array_keys($data)),
array_values($data),
$action['message']
);
// 发送到告警系统
AlertService::send($action['target'], $message);
}
}
五、性能优化策略
1. 消息批处理
<?php
class MessageBatcher
{
private $batchSize = 100;
private $batchInterval = 5; // 秒
private $buffer = [];
private $timer;
public function __construct($batchSize, $batchInterval)
{
$this->batchSize = $batchSize;
$this->batchInterval = $batchInterval;
}
public function addMessage($message)
{
$this->buffer[] = $message;
// 达到批量大小立即处理
if (count($this->buffer) >= $this->batchSize) {
$this->processBatch();
}
// 启动定时器
elseif (!isset($this->timer)) {
$this->timer = swoole_timer_after(
$this->batchInterval * 1000,
[$this, 'processBatch']
);
}
}
public function processBatch()
{
if (isset($this->timer)) {
swoole_timer_clear($this->timer);
$this->timer = null;
}
if (!empty($this->buffer)) {
$batch = $this->buffer;
$this->buffer = [];
// 批量插入数据库
$this->bulkInsert($batch);
}
}
private function bulkInsert($messages)
{
$placeholders = [];
$values = [];
foreach ($messages as $msg) {
$placeholders[] = '(?, ?, ?, NOW())';
$values = array_merge($values, [
$msg['device_id'],
$msg['type'],
json_encode($msg['data'])
]);
}
$sql = "INSERT INTO device_messages
(device_id, message_type, content, created_at)
VALUES " . implode(',', $placeholders);
$stmt = $this->db->prepare($sql);
$stmt->execute($values);
}
}
2. 连接池管理
<?php
class ConnectionPool
{
private $pool;
private $config;
private $size;
private $count = 0;
public function __construct($config, $size = 20)
{
$this->config = $config;
$this->size = $size;
$this->pool = new SplQueue();
}
public function getConnection()
{
if (!$this->pool->isEmpty()) {
return $this->pool->dequeue();
}
if ($this->count $this->config['host'],
'port' => $this->config['port'],
'user' => $this->config['user'],
'password' => $this->config['password'],
'database' => $this->config['database'],
'timeout' => $this->config['timeout'] ?? 3.0
]);
return $conn;
}
public function stats()
{
return [
'total' => $this->size,
'idle' => $this->pool->count(),
'active' => $this->count - $this->pool->count()
];
}
}
六、实战案例:智能农业系统
1. 设备注册API
<?php
class DeviceController
{
public function register(Request $request)
{
$data = $request->validate([
'serial_number' => 'required|unique:devices',
'type' => 'required|in:sensor,controller,gateway',
'location' => 'required|json',
'attributes' => 'json'
]);
// 生成设备凭证
$credentials = $this->generateCredentials($data['serial_number']);
// 创建设备
$device = Device::create([
'serial_number' => $data['serial_number'],
'type' => $data['type'],
'status' => 'inactive',
'location' => $data['location'],
'attributes' => $data['attributes'] ?? null,
'auth_key' => $credentials['key'],
'auth_secret' => $credentials['secret']
]);
// 初始化影子文档
ShadowService::initialize($device->id);
return response()->json([
'device_id' => $device->id,
'auth_key' => $credentials['key'],
'auth_secret' => $credentials['secret'],
'mqtt_server' => config('mqtt.host')
]);
}
private function generateCredentials($serial)
{
return [
'key' => Str::random(32),
'secret' => hash_hmac('sha256', $serial, config('app.key'))
];
}
}
2. 数据可视化接口
<?php
class DataController
{
public function getTelemetry(Request $request, $deviceId)
{
$device = Device::findOrFail($deviceId);
$this->authorize('view', $device);
$query = DeviceData::where('device_id', $deviceId)
->orderBy('created_at', 'desc');
if ($request->has('last')) {
$query->limit($request->input('last', 100));
} else {
$query->whereBetween('created_at', [
$request->input('from', now()->subDay()),
$request->input('to', now())
]);
}
$data = $query->get(['temperature', 'humidity', 'voltage', 'created_at']);
return response()->json([
'device_id' => $deviceId,
'data' => $data,
'latest' => Redis::hGetAll("device:{$deviceId}:latest")
]);
}
public function realtimeUpdates($deviceId)
{
$device = Device::findOrFail($deviceId);
$this->authorize('view', $device);
return response()->stream(function() use ($deviceId) {
$loop = 0;
$lastData = null;
while (true) {
$currentData = Redis::hGetAll("device:{$deviceId}:latest");
if ($currentData != $lastData || $loop % 10 === 0) {
echo "data: " . json_encode($currentData) . "nn";
ob_flush();
flush();
$lastData = $currentData;
}
$loop++;
sleep(1);
}
}, 200, [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'Connection' => 'keep-alive'
]);
}
}