ThinkPHP6+WebSocket实时消息推送系统开发实战 | 即时通讯解决方案

2025-08-09 0 220

从零构建高并发在线即时通讯系统的完整指南

一、实时通讯系统架构设计

现代Web应用实时通讯需要解决的核心问题:

  • 双向通信:服务端主动推送能力
  • 高并发连接:万级长连接管理
  • 消息可靠性:消息必达与离线存储
  • 多协议支持:兼容HTTP与WebSocket
  • 水平扩展:支持分布式部署

系统架构图

HTTP API → 业务逻辑层 → WebSocket服务 → 消息队列
    ↑           ↑              ↑            ↑
前端应用     用户认证       连接管理     异步处理
            

二、核心服务实现

1. WebSocket服务端集成

<?php
// websocket.php 服务启动文件
use thinkswooleWebsocket;
use thinkswoolewebsocketRoom;

return [
    'server' => [
        'host' => '0.0.0.0',
        'port' => 9502,
        'options' => [
            'worker_num' => 4,
            'enable_coroutine' => true,
        ]
    ],
    'websocket' => [
        'enable' => true,
        'handler' => appwebsocketWebSocketHandler::class
    ],
    'room' => [
        'type' => 'table',
        'table' => [
            'room_rows' => 8192,
            'room_size' => 2048,
        ]
    ]
];

// WebSocketHandler.php 核心处理器
namespace appwebsocket;

use thinkswoolewebsocketWebsocketHandler;
use thinkswoolewebsocketRoom;

class WebSocketHandler implements WebsocketHandler
{
    protected $room;
    
    public function __construct(Room $room)
    {
        $this->room = $room;
    }
    
    // 连接建立时触发
    public function onOpen($server, $request)
    {
        $userId = $request->get['uid'] ?? 0;
        if (!$userId) {
            $server->close($request->fd);
            return;
        }
        
        // 将用户ID与连接FD绑定
        $this->room->add($userId, $request->fd);
        $server->push($request->fd, json_encode([
            'type' => 'system',
            'msg' => '连接成功'
        ]));
    }
    
    // 收到消息时触发
    public function onMessage($server, $frame)
    {
        $data = json_decode($frame->data, true);
        switch ($data['type']) {
            case 'chat':
                $this->handleChatMessage($server, $frame->fd, $data);
                break;
            // 其他消息类型处理...
        }
    }
    
    protected function handleChatMessage($server, $fd, $data)
    {
        $to = $data['to']; // 接收者ID
        $fds = $this->room->getClients($to);
        
        foreach ($fds as $clientFd) {
            $server->push($clientFd, json_encode([
                'type' => 'chat',
                'from' => $data['from'],
                'content' => $data['content'],
                'time' => time()
            ]));
        }
    }
    
    // 连接关闭时触发
    public function onClose($server, $fd)
    {
        $this->room->delete($fd);
    }
}

2. 用户认证与消息存储

<?php
// 消息控制器
namespace appcontroller;

use thinkfacadeDb;
use thinkswooleWebsocket;

class Message
{
    protected $websocket;
    
    public function __construct(Websocket $websocket)
    {
        $this->websocket = $websocket;
    }
    
    // 发送消息API
    public function send()
    {
        $data = request()->post();
        $from = request()->userId; // JWT认证获取的用户ID
        
        // 消息入库
        $messageId = Db::name('messages')->insertGetId([
            'from_uid' => $from,
            'to_uid' => $data['to'],
            'content' => $data['content'],
            'status' => 0, // 未读
            'create_time' => time()
        ]);
        
        // 实时推送
        $this->websocket->to($data['to'])->emit('chat', [
            'id' => $messageId,
            'from' => $from,
            'content' => $data['content'],
            'time' => time()
        ]);
        
        return json(['code' => 200, 'msg' => '发送成功']);
    }
    
    // 获取历史消息
    public function history()
    {
        $userId = request()->userId;
        $to = request()->get('to');
        
        $list = Db::name('messages')
            ->where(function($query) use ($userId, $to) {
                $query->where('from_uid', $userId)
                    ->where('to_uid', $to);
            })
            ->whereOr(function($query) use ($userId, $to) {
                $query->where('from_uid', $to)
                    ->where('to_uid', $userId);
            })
            ->order('id', 'desc')
            ->limit(20)
            ->select();
            
        return json(['code' => 200, 'data' => array_reverse($list)]);
    }
}

三、高级功能实现

1. 消息已读回执

<?php
// 已读回执处理
public function read()
{
    $userId = request()->userId;
    $messageIds = request()->post('ids');
    
    Db::name('messages')
        ->where('to_uid', $userId)
        ->where('id', 'in', $messageIds)
        ->update([
            'status' => 1, // 已读
            'read_time' => time()
        ]);
    
    // 通知发送方消息已读
    $messages = Db::name('messages')
        ->where('id', 'in', $messageIds)
        ->column('from_uid', 'id');
    
    $fromUsers = array_unique(array_values($messages));
    foreach ($fromUsers as $from) {
        $this->websocket->to($from)->emit('read', [
            'ids' => array_keys(array_filter($messages, function($v) use ($from) {
                return $v == $from;
            }))
        ]);
    }
    
    return json(['code' => 200]);
}

// WebSocketHandler中添加处理
case 'read':
    $this->handleReadReceipt($server, $frame->fd, $data);
    break;

protected function handleReadReceipt($server, $fd, $data)
{
    $messageIds = $data['ids'];
    $from = $this->room->getUid($fd);
    
    Db::name('messages')
        ->where('from_uid', $from)
        ->where('id', 'in', $messageIds)
        ->update(['status' => 1]);
}

2. 消息撤回功能

<?php
public function recall()
{
    $userId = request()->userId;
    $messageId = request()->post('id');
    
    $message = Db::name('messages')
        ->where('id', $messageId)
        ->where('from_uid', $userId)
        ->find();
        
    if (!$message) {
        return json(['code' => 404, 'msg' => '消息不存在']);
    }
    
    // 超过2分钟不能撤回
    if (time() - $message['create_time'] > 120) {
        return json(['code' => 400, 'msg' => '超过撤回时限']);
    }
    
    Db::name('messages')
        ->where('id', $messageId)
        ->update([
            'content' => '[消息已撤回]',
            'is_recall' => 1
        ]);
    
    // 通知双方撤回消息
    $this->websocket->to($message['to_uid'])->emit('recall', [
        'id' => $messageId
    ]);
    $this->websocket->to($userId)->emit('recall', [
        'id' => $messageId
    ]);
    
    return json(['code' => 200]);
}

四、性能优化策略

1. 消息队列异步处理

<?php
// 消息队列任务
namespace appjob;

use thinkqueueJob;
use thinkswooleWebsocket;

class PushMessage
{
    protected $websocket;
    
    public function __construct(Websocket $websocket)
    {
        $this->websocket = $websocket;
    }
    
    public function fire(Job $job, $data)
    {
        try {
            $this->websocket->to($data['to'])->emit($data['type'], $data['content']);
            $job->delete();
        } catch (Exception $e) {
            // 失败重试
            if ($job->attempts() > 3) {
                $job->delete();
            } else {
                $job->release(10);
            }
        }
    }
}

// 控制器中使用队列
public function send()
{
    // ...消息入库逻辑
    
    // 异步推送
    thinkQueue::push('appjobPushMessage', [
        'to' => $data['to'],
        'type' => 'chat',
        'content' => [
            'id' => $messageId,
            'from' => $from,
            'content' => $data['content'],
            'time' => time()
        ]
    ]);
    
    return json(['code' => 200]);
}

2. 连接心跳检测

<?php
// 修改websocket配置
'options' => [
    'worker_num' => 4,
    'enable_coroutine' => true,
    'heartbeat_idle_time' => 60,
    'heartbeat_check_interval' => 30
]

// WebSocketHandler中添加心跳处理
public function onMessage($server, $frame)
{
    if ($frame->data === 'PING') {
        $server->push($frame->fd, 'PONG');
        return;
    }
    
    // ...原有消息处理逻辑
}

// 前端定时发送心跳
setInterval(() => {
    if (socket.readyState === WebSocket.OPEN) {
        socket.send('PING');
    }
}, 50000); // 50秒发送一次

五、实战案例:在线客服系统

1. 客服分配策略

<?php
class CustomerService
{
    protected $onlineAgents = [];
    
    // 客服上线
    public function agentOnline($agentId, $fd)
    {
        $this->onlineAgents[$agentId] = [
            'fd' => $fd,
            'current' => 0, // 当前接待数
            'max' => 5      // 最大接待数
        ];
    }
    
    // 分配客服
    public function assignAgent()
    {
        // 按当前接待数升序排序
        uasort($this->onlineAgents, function($a, $b) {
            return $a['current']  $b['current'];
        });
        
        foreach ($this->onlineAgents as $agentId => $agent) {
            if ($agent['current'] onlineAgents[$agentId]['current']++;
                return $agentId;
            }
        }
        
        return false; // 没有可用客服
    }
    
    // 会话结束
    public function sessionEnd($agentId)
    {
        if (isset($this->onlineAgents[$agentId])) {
            $this->onlineAgents[$agentId]['current']--;
        }
    }
}

2. 会话状态管理

<?php
class SessionManager
{
    protected $sessions = [];
    
    // 创建会话
    public function create($userId, $agentId)
    {
        $sessionId = uniqid();
        $this->sessions[$sessionId] = [
            'user_id' => $userId,
            'agent_id' => $agentId,
            'start_time' => time(),
            'messages' => []
        ];
        return $sessionId;
    }
    
    // 添加消息
    public function addMessage($sessionId, $message)
    {
        if (isset($this->sessions[$sessionId])) {
            $this->sessions[$sessionId]['messages'][] = $message;
        }
    }
    
    // 结束会话
    public function close($sessionId)
    {
        if (isset($this->sessions[$sessionId])) {
            $session = $this->sessions[$sessionId];
            unset($this->sessions[$sessionId]);
            return $session;
        }
        return false;
    }
}

扩展方向

  • 消息加密与安全传输
  • 多端消息同步
  • 客服评价系统
  • 聊天记录分析

©2023 ThinkPHP实战开发社区 | 原创内容转载请注明出处

ThinkPHP6+WebSocket实时消息推送系统开发实战 | 即时通讯解决方案
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 thinkphp ThinkPHP6+WebSocket实时消息推送系统开发实战 | 即时通讯解决方案 https://www.taomawang.com/server/thinkphp/781.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务