从零构建高并发在线即时通讯系统的完整指南
一、实时通讯系统架构设计
现代Web应用实时通讯需要解决的核心问题:
- 双向通信:服务端主动推送能力
- 高并发连接:万级长连接管理
- 消息可靠性:消息必达与离线存储
- 多协议支持:兼容HTTP与WebSocket
- 水平扩展:支持分布式部署
系统架构图
HTTP API → 业务逻辑层 → WebSocket服务 → 消息队列 ↑ ↑ ↑ ↑ 前端应用 用户认证 连接管理 异步处理
二、核心服务实现
1. WebSocket服务端集成
<?php
// websocket.php 服务启动文件
use thinkswooleWebsocket;
use thinkswoolewebsocketRoom;
return [
'server' => [
'host' => '0.0.0.0',
'port' => 9502,
'options' => [
'worker_num' => 4,
'enable_coroutine' => true,
]
],
'websocket' => [
'enable' => true,
'handler' => appwebsocketWebSocketHandler::class
],
'room' => [
'type' => 'table',
'table' => [
'room_rows' => 8192,
'room_size' => 2048,
]
]
];
// WebSocketHandler.php 核心处理器
namespace appwebsocket;
use thinkswoolewebsocketWebsocketHandler;
use thinkswoolewebsocketRoom;
class WebSocketHandler implements WebsocketHandler
{
protected $room;
public function __construct(Room $room)
{
$this->room = $room;
}
// 连接建立时触发
public function onOpen($server, $request)
{
$userId = $request->get['uid'] ?? 0;
if (!$userId) {
$server->close($request->fd);
return;
}
// 将用户ID与连接FD绑定
$this->room->add($userId, $request->fd);
$server->push($request->fd, json_encode([
'type' => 'system',
'msg' => '连接成功'
]));
}
// 收到消息时触发
public function onMessage($server, $frame)
{
$data = json_decode($frame->data, true);
switch ($data['type']) {
case 'chat':
$this->handleChatMessage($server, $frame->fd, $data);
break;
// 其他消息类型处理...
}
}
protected function handleChatMessage($server, $fd, $data)
{
$to = $data['to']; // 接收者ID
$fds = $this->room->getClients($to);
foreach ($fds as $clientFd) {
$server->push($clientFd, json_encode([
'type' => 'chat',
'from' => $data['from'],
'content' => $data['content'],
'time' => time()
]));
}
}
// 连接关闭时触发
public function onClose($server, $fd)
{
$this->room->delete($fd);
}
}
2. 用户认证与消息存储
<?php
// 消息控制器
namespace appcontroller;
use thinkfacadeDb;
use thinkswooleWebsocket;
class Message
{
protected $websocket;
public function __construct(Websocket $websocket)
{
$this->websocket = $websocket;
}
// 发送消息API
public function send()
{
$data = request()->post();
$from = request()->userId; // JWT认证获取的用户ID
// 消息入库
$messageId = Db::name('messages')->insertGetId([
'from_uid' => $from,
'to_uid' => $data['to'],
'content' => $data['content'],
'status' => 0, // 未读
'create_time' => time()
]);
// 实时推送
$this->websocket->to($data['to'])->emit('chat', [
'id' => $messageId,
'from' => $from,
'content' => $data['content'],
'time' => time()
]);
return json(['code' => 200, 'msg' => '发送成功']);
}
// 获取历史消息
public function history()
{
$userId = request()->userId;
$to = request()->get('to');
$list = Db::name('messages')
->where(function($query) use ($userId, $to) {
$query->where('from_uid', $userId)
->where('to_uid', $to);
})
->whereOr(function($query) use ($userId, $to) {
$query->where('from_uid', $to)
->where('to_uid', $userId);
})
->order('id', 'desc')
->limit(20)
->select();
return json(['code' => 200, 'data' => array_reverse($list)]);
}
}
三、高级功能实现
1. 消息已读回执
<?php
// 已读回执处理
public function read()
{
$userId = request()->userId;
$messageIds = request()->post('ids');
Db::name('messages')
->where('to_uid', $userId)
->where('id', 'in', $messageIds)
->update([
'status' => 1, // 已读
'read_time' => time()
]);
// 通知发送方消息已读
$messages = Db::name('messages')
->where('id', 'in', $messageIds)
->column('from_uid', 'id');
$fromUsers = array_unique(array_values($messages));
foreach ($fromUsers as $from) {
$this->websocket->to($from)->emit('read', [
'ids' => array_keys(array_filter($messages, function($v) use ($from) {
return $v == $from;
}))
]);
}
return json(['code' => 200]);
}
// WebSocketHandler中添加处理
case 'read':
$this->handleReadReceipt($server, $frame->fd, $data);
break;
protected function handleReadReceipt($server, $fd, $data)
{
$messageIds = $data['ids'];
$from = $this->room->getUid($fd);
Db::name('messages')
->where('from_uid', $from)
->where('id', 'in', $messageIds)
->update(['status' => 1]);
}
2. 消息撤回功能
<?php
public function recall()
{
$userId = request()->userId;
$messageId = request()->post('id');
$message = Db::name('messages')
->where('id', $messageId)
->where('from_uid', $userId)
->find();
if (!$message) {
return json(['code' => 404, 'msg' => '消息不存在']);
}
// 超过2分钟不能撤回
if (time() - $message['create_time'] > 120) {
return json(['code' => 400, 'msg' => '超过撤回时限']);
}
Db::name('messages')
->where('id', $messageId)
->update([
'content' => '[消息已撤回]',
'is_recall' => 1
]);
// 通知双方撤回消息
$this->websocket->to($message['to_uid'])->emit('recall', [
'id' => $messageId
]);
$this->websocket->to($userId)->emit('recall', [
'id' => $messageId
]);
return json(['code' => 200]);
}
四、性能优化策略
1. 消息队列异步处理
<?php
// 消息队列任务
namespace appjob;
use thinkqueueJob;
use thinkswooleWebsocket;
class PushMessage
{
protected $websocket;
public function __construct(Websocket $websocket)
{
$this->websocket = $websocket;
}
public function fire(Job $job, $data)
{
try {
$this->websocket->to($data['to'])->emit($data['type'], $data['content']);
$job->delete();
} catch (Exception $e) {
// 失败重试
if ($job->attempts() > 3) {
$job->delete();
} else {
$job->release(10);
}
}
}
}
// 控制器中使用队列
public function send()
{
// ...消息入库逻辑
// 异步推送
thinkQueue::push('appjobPushMessage', [
'to' => $data['to'],
'type' => 'chat',
'content' => [
'id' => $messageId,
'from' => $from,
'content' => $data['content'],
'time' => time()
]
]);
return json(['code' => 200]);
}
2. 连接心跳检测
<?php
// 修改websocket配置
'options' => [
'worker_num' => 4,
'enable_coroutine' => true,
'heartbeat_idle_time' => 60,
'heartbeat_check_interval' => 30
]
// WebSocketHandler中添加心跳处理
public function onMessage($server, $frame)
{
if ($frame->data === 'PING') {
$server->push($frame->fd, 'PONG');
return;
}
// ...原有消息处理逻辑
}
// 前端定时发送心跳
setInterval(() => {
if (socket.readyState === WebSocket.OPEN) {
socket.send('PING');
}
}, 50000); // 50秒发送一次
五、实战案例:在线客服系统
1. 客服分配策略
<?php
class CustomerService
{
protected $onlineAgents = [];
// 客服上线
public function agentOnline($agentId, $fd)
{
$this->onlineAgents[$agentId] = [
'fd' => $fd,
'current' => 0, // 当前接待数
'max' => 5 // 最大接待数
];
}
// 分配客服
public function assignAgent()
{
// 按当前接待数升序排序
uasort($this->onlineAgents, function($a, $b) {
return $a['current'] $b['current'];
});
foreach ($this->onlineAgents as $agentId => $agent) {
if ($agent['current'] onlineAgents[$agentId]['current']++;
return $agentId;
}
}
return false; // 没有可用客服
}
// 会话结束
public function sessionEnd($agentId)
{
if (isset($this->onlineAgents[$agentId])) {
$this->onlineAgents[$agentId]['current']--;
}
}
}
2. 会话状态管理
<?php
class SessionManager
{
protected $sessions = [];
// 创建会话
public function create($userId, $agentId)
{
$sessionId = uniqid();
$this->sessions[$sessionId] = [
'user_id' => $userId,
'agent_id' => $agentId,
'start_time' => time(),
'messages' => []
];
return $sessionId;
}
// 添加消息
public function addMessage($sessionId, $message)
{
if (isset($this->sessions[$sessionId])) {
$this->sessions[$sessionId]['messages'][] = $message;
}
}
// 结束会话
public function close($sessionId)
{
if (isset($this->sessions[$sessionId])) {
$session = $this->sessions[$sessionId];
unset($this->sessions[$sessionId]);
return $session;
}
return false;
}
}