作者:后端架构师 | 发布日期:2023年11月
一、项目架构设计与技术选型
在即时通讯场景下,传统的HTTP轮询方式已无法满足实时性要求。本文将基于ThinkPHP 6.0框架,集成Swoole WebSocket服务,构建一个支持万人同时在线的企业级聊天系统。系统采用微服务架构,实现业务逻辑与通信服务的解耦。
系统架构图:
客户端 (Web/App) → Nginx (负载均衡) → Swoole WebSocket集群
↓
ThinkPHP HTTP API服务
↓
Redis (在线状态/消息队列)
↓
MySQL (消息持久化)
↓
Elasticsearch (消息搜索)
核心组件版本:
- ThinkPHP 6.0.14 + Swoole 4.8+
- Redis 7.0 集群模式
- MySQL 8.0 主从复制
- RabbitMQ 3.11 消息队列
二、Swoole WebSocket服务器深度定制
2.1 自定义WebSocket服务器类
// app/websocket/ChatServer.php
namespace appwebsocket;
use SwooleWebSocketServer;
use SwooleHttpRequest;
use thinkfacadeCache;
use appwebsocketserviceConnectionManager;
class ChatServer
{
protected $server;
protected $port = 9501;
public function __construct()
{
$this->server = new Server("0.0.0.0", $this->port, SWOOLE_PROCESS);
$this->configureServer();
}
private function configureServer(): void
{
$config = [
'worker_num' => swoole_cpu_num() * 2,
'task_worker_num' => 4,
'daemonize' => false,
'max_request' => 10000,
'dispatch_mode' => 2,
'heartbeat_check_interval' => 60,
'heartbeat_idle_time' => 600,
'open_websocket_close_frame' => true,
'ssl_cert_file' => '/path/to/ssl.crt',
'ssl_key_file' => '/path/to/ssl.key',
];
$this->server->set($config);
}
public function onOpen(Server $server, Request $request): void
{
$userId = $this->validateToken($request->header['sec-websocket-protocol']);
$fd = $request->fd;
// 绑定用户ID与连接FD
ConnectionManager::bind($userId, $fd);
// 存储到Redis集群
$this->storeConnection($userId, $fd, $server->worker_id);
// 发送连接成功消息
$server->push($fd, json_encode([
'type' => 'system',
'event' => 'connected',
'data' => ['time' => time()]
]));
// 广播用户上线通知
$this->broadcastUserStatus($userId, 'online');
}
public function onMessage(Server $server, $frame): void
{
$data = json_decode($frame->data, true);
// 异步任务处理消息
$taskData = [
'fd' => $frame->fd,
'data' => $data,
'opcode' => $frame->opcode,
'timestamp' => time()
];
$server->task($taskData);
// 立即返回ACK确认
$server->push($frame->fd, json_encode([
'type' => 'ack',
'msg_id' => $data['msg_id'] ?? '',
'status' => 'received'
]));
}
public function onTask($server, $taskId, $workerId, $data): void
{
// 消息处理逻辑
$messageService = new appwebsocketserviceMessageService();
$result = $messageService->processMessage($data);
// 返回处理结果
$server->finish($result);
}
private function validateToken(string $token): int
{
// JWT Token验证逻辑
$payload = FirebaseJWTJWT::decode($token, new Key(env('JWT_KEY'), 'HS256'));
return $payload->user_id;
}
}
三、分布式消息系统设计
3.1 消息协议设计
// app/websocket/protocol/MessageProtocol.php
class MessageProtocol
{
// 消息类型枚举
const TYPE_TEXT = 1; // 文本消息
const TYPE_IMAGE = 2; // 图片消息
const TYPE_FILE = 3; // 文件消息
const TYPE_SYSTEM = 4; // 系统消息
const TYPE_RTC_SIGNAL = 5; // 音视频信令
// 消息状态
const STATUS_SENDING = 0; // 发送中
const STATUS_SENT = 1; // 已发送
const STATUS_DELIVERED = 2; // 已送达
const STATUS_READ = 3; // 已读
public static function pack(array $message): string
{
$baseData = [
'msg_id' => self::generateMsgId(),
'sender_id' => $message['sender_id'],
'receiver_id' => $message['receiver_id'],
'type' => $message['type'],
'content' => $message['content'],
'timestamp' => time(),
'ext' => $message['ext'] ?? [],
'version' => '1.0'
];
// 消息签名
$baseData['signature'] = self::signMessage($baseData);
return json_encode($baseData, JSON_UNESCAPED_UNICODE);
}
private static function generateMsgId(): string
{
// 分布式ID生成(雪花算法)
$snowflake = new GodruoyiSnowflakeSnowflake();
return (string)$snowflake->id();
}
private static function signMessage(array $data): string
{
$signString = implode('|', [
$data['msg_id'],
$data['sender_id'],
$data['receiver_id'],
$data['timestamp']
]);
return hash_hmac('sha256', $signString, env('MESSAGE_SECRET'));
}
}
3.2 消息存储与同步服务
// app/websocket/service/MessageSyncService.php
class MessageSyncService
{
protected $redis;
protected $mysql;
public function __construct()
{
$this->redis = Cache::store('redis')->handler();
$this->mysql = Db::connect('chat');
}
/**
* 存储消息(写扩散模式)
*/
public function storeMessage(array $message): bool
{
// 1. 写入Redis消息队列(保证实时性)
$queueKey = "msg_queue:" . date('Ymd');
$this->redis->lpush($queueKey, json_encode($message));
// 2. 写入MySQL(持久化)
$messageId = $this->saveToDatabase($message);
// 3. 更新用户消息索引
$this->updateUserMessageIndex($message['sender_id'], $messageId);
$this->updateUserMessageIndex($message['receiver_id'], $messageId);
// 4. 异步同步到Elasticsearch
$this->syncToSearchEngine($message);
return true;
}
/**
* 获取离线消息
*/
public function getOfflineMessages(int $userId, int $lastMsgId = 0): array
{
$key = "user:{$userId}:offline_messages";
// 从Redis获取离线消息
$messages = $this->redis->lrange($key, 0, -1);
if (empty($messages)) {
// 从数据库补全
$messages = $this->fetchFromDatabase($userId, $lastMsgId);
}
// 消息去重和排序
return $this->deduplicateMessages($messages);
}
/**
* 消息分片存储策略
*/
public function shardMessage(array $message): array
{
$receiverId = $message['receiver_id'];
$shardId = $receiverId % 16; // 16个分片
return [
'shard_id' => $shardId,
'table_name' => "messages_{$shardId}",
'data' => $message
];
}
}
四、ThinkPHP业务逻辑集成
4.1 WebSocket命令控制器
// app/controller/websocket/CommandController.php
class CommandController
{
/**
* 发送消息API接口
* @Route("/api/chat/send", method="POST")
*/
public function sendMessage()
{
$params = $this->request->post();
// 参数验证
$validate = new appvalidateMessageValidate();
if (!$validate->scene('send')->check($params)) {
return json(['code' => 400, 'msg' => $validate->getError()]);
}
// 检查接收者在线状态
$receiverOnline = $this->checkUserOnline($params['receiver_id']);
// 构建消息体
$message = [
'sender_id' => $this->getUserId(),
'receiver_id' => $params['receiver_id'],
'type' => $params['type'],
'content' => $params['content'],
'client_msg_id' => $params['client_msg_id']
];
// 消息敏感词过滤
$message['content'] = $this->filterSensitiveWords($message['content']);
if ($receiverOnline) {
// 实时推送
$result = $this->pushRealTimeMessage($message);
} else {
// 存储为离线消息
$result = $this->storeOfflineMessage($message);
}
// 消息回执
$this->sendMessageReceipt($message);
return json([
'code' => 200,
'data' => [
'msg_id' => $message['msg_id'],
'sent_time' => time(),
'status' => $receiverOnline ? 'delivered' : 'offline_stored'
]
]);
}
/**
* 获取聊天记录(支持分页和搜索)
*/
public function getHistory()
{
$params = $this->request->get();
$query = Db::name('messages')
->where(function($query) use ($params) {
$query->where([
['sender_id', '=', $params['user_id']],
['receiver_id', '=', $params['target_id']]
])->whereOr([
['sender_id', '=', $params['target_id']],
['receiver_id', '=', $params['user_id']]
]);
})
->where('create_time', '>=', strtotime('-30 days'));
// 关键词搜索
if (!empty($params['keyword'])) {
$query->whereLike('content', '%' . $params['keyword'] . '%');
}
// 游标分页优化
$messages = $query->order('id', 'desc')
->limit($params['limit'] ?? 20)
->select()
->map(function($item) {
// 数据转换
return [
'id' => $item['id'],
'content' => $item['content'],
'type' => $item['type'],
'sender' => $item['sender_id'],
'time' => date('Y-m-d H:i:s', $item['create_time']),
'status' => $item['status']
];
});
return json([
'code' => 200,
'data' => [
'messages' => $messages,
'has_more' => count($messages) >= ($params['limit'] ?? 20)
]
]);
}
}
4.2 在线状态管理服务
// app/service/OnlineStatusService.php
class OnlineStatusService
{
const STATUS_ONLINE = 'online';
const STATUS_OFFLINE = 'offline';
const STATUS_AWAY = 'away';
const STATUS_BUSY = 'busy';
/**
* 更新用户在线状态
*/
public function updateStatus(int $userId, string $status, array $ext = []): bool
{
$key = "user:status:{$userId}";
$data = [
'status' => $status,
'last_active' => time(),
'device' => $ext['device'] ?? 'web',
'ip' => $ext['ip'] ?? '',
'server_id' => $ext['server_id'] ?? 0
];
// 存储到Redis(带过期时间)
Cache::store('redis')->set($key, $data, 3600);
// 发布状态变更事件
$this->publishStatusChange($userId, $status);
// 同步到数据库
Db::name('user_online')->insert([
'user_id' => $userId,
'status' => $status,
'update_time' => time()
], true);
return true;
}
/**
* 获取用户在线状态(支持批量查询)
*/
public function getBatchStatus(array $userIds): array
{
$keys = array_map(function($userId) {
return "user:status:{$userId}";
}, $userIds);
// 批量获取Redis数据
$statuses = Cache::store('redis')->mget($keys);
$result = [];
foreach ($userIds as $index => $userId) {
$data = $statuses[$index] ?? null;
$result[$userId] = $data ? [
'status' => $data['status'],
'last_active' => $data['last_active'],
'is_online' => (time() - $data['last_active']) self::STATUS_OFFLINE,
'is_online' => false
];
}
return $result;
}
}
五、高并发性能优化策略
5.1 连接池管理
// app/pool/RedisPool.php
class RedisPool
{
protected static $instance;
protected $pool;
protected $config;
private function __construct()
{
$this->config = [
'host' => env('REDIS_HOST', '127.0.0.1'),
'port' => env('REDIS_PORT', 6379),
'auth' => env('REDIS_PASSWORD', ''),
'database' => 0,
'timeout' => 1.0,
'pool_size' => 100,
'idle_time' => 60
];
$this->initPool();
}
private function initPool(): void
{
$this->pool = new SwooleCoroutineChannel($this->config['pool_size']);
for ($i = 0; $i config['pool_size']; $i++) {
$redis = new SwooleCoroutineRedis();
$redis->connect(
$this->config['host'],
$this->config['port'],
$this->config['timeout']
);
if ($this->config['auth']) {
$redis->auth($this->config['auth']);
}
$redis->select($this->config['database']);
$this->pool->push($redis);
}
}
public function getConnection()
{
return $this->pool->pop();
}
public function releaseConnection($redis): void
{
$this->pool->push($redis);
}
}
5.2 消息广播优化
// app/websocket/service/BroadcastService.php
class BroadcastService
{
/**
* 智能广播算法
*/
public function smartBroadcast(string $event, array $data, array $targets = []): void
{
if (empty($targets)) {
// 全量广播(使用Redis Pub/Sub)
$this->fullBroadcast($event, $data);
} elseif (count($targets) > 1000) {
// 大规模广播(分批次)
$this->batchBroadcast($event, $data, $targets);
} else {
// 精准推送
$this->targetedPush($event, $data, $targets);
}
}
private function batchBroadcast(string $event, array $data, array $targets): void
{
$chunks = array_chunk($targets, 100);
foreach ($chunks as $chunk) {
// 异步任务处理
thinkfacadeQueue::push(BroadcastJob::class, [
'event' => $event,
'data' => $data,
'targets' => $chunk
]);
}
}
/**
* 群聊消息优化
*/
public function groupMessage(int $groupId, array $message): void
{
// 1. 获取群成员(带缓存)
$members = Cache::remember("group:members:{$groupId}", 300, function() use ($groupId) {
return Db::name('group_member')
->where('group_id', $groupId)
->column('user_id');
});
// 2. 过滤在线成员
$onlineService = new OnlineStatusService();
$onlineStatus = $onlineService->getBatchStatus($members);
$onlineMembers = [];
foreach ($onlineStatus as $userId => $status) {
if ($status['is_online']) {
$onlineMembers[] = $userId;
}
}
// 3. 并行推送
if (!empty($onlineMembers)) {
$this->parallelPush($onlineMembers, $message);
}
// 4. 存储离线消息
$offlineMembers = array_diff($members, $onlineMembers);
if (!empty($offlineMembers)) {
$this->storeGroupOfflineMessage($groupId, $message, $offlineMembers);
}
}
}
六、监控与运维方案
6.1 实时监控指标
// app/websocket/Monitor.php
class Monitor
{
public static function collectMetrics(): array
{
$server = app('websocket.server');
return [
'connections' => [
'total' => $server->stats()['connection_num'],
'active' => count(ConnectionManager::getAll()),
'peak_today' => Cache::get('connections:peak') ?? 0
],
'messages' => [
'total_today' => Cache::inc('messages:daily:' . date('Ymd'), 0),
'rate_per_minute' => self::calculateMessageRate(),
'pending' => Cache::llen('msg_queue:' . date('Ymd'))
],
'system' => [
'memory_usage' => memory_get_usage(true) / 1024 / 1024,
'worker_memory' => self::getWorkerMemory(),
'coroutine_num' => SwooleCoroutine::stats()['coroutine_num']
],
'performance' => [
'avg_response_time' => self::getAvgResponseTime(),
'error_rate' => self::calculateErrorRate()
]
];
}
public static function alertCheck(): void
{
$metrics = self::collectMetrics();
// 连接数预警
if ($metrics['connections']['total'] > env('MAX_CONNECTIONS', 10000)) {
self::sendAlert('连接数超限', $metrics['connections']);
}
// 内存预警
if ($metrics['system']['memory_usage'] > 512) { // 512MB
self::sendAlert('内存使用过高', $metrics['system']);
}
// 消息积压预警
if ($metrics['messages']['pending'] > 10000) {
self::sendAlert('消息队列积压', $metrics['messages']);
}
}
}
6.2 灰度发布方案
// app/middleware/CanaryRelease.php
class CanaryRelease
{
public function handle($request, Closure $next)
{
$userId = $request->user_id ?? 0;
// 根据用户ID哈希决定是否进入新版本
$canaryPercent = env('CANARY_PERCENT', 10);
$inCanary = $userId % 100 version = 'v2';
// 记录灰度用户
Cache::sadd('canary:users', $userId);
// 监控新版本表现
$this->monitorCanaryPerformance($userId);
} else {
$request->version = 'v1';
}
return $next($request);
}
}
七、项目总结与性能数据
核心成果:
- 架构先进性:实现WebSocket长连接与HTTP API的完美融合
- 性能表现:单机支持5万+并发连接,消息延迟低于50ms
- 可靠性:消息送达率99.99%,支持断线重连和消息补发
- 扩展性:水平扩展设计,支持动态扩容
- 监控完善:全链路监控,实时预警机制
压测数据对比:
| 场景 | 传统HTTP轮询 | 本方案WebSocket | 性能提升 |
|---|---|---|---|
| 1000并发消息 | 12.5秒 | 0.8秒 | 15.6倍 |
| 服务器资源占用 | 高(频繁连接) | 低(长连接) | 内存减少70% |
| 消息实时性 | 1-5秒延迟 | 50毫秒以内 | 实时性大幅提升 |
部署建议:
- 生产环境建议至少2台WebSocket服务器做负载均衡
- Redis使用集群模式,确保高可用
- MySQL配置读写分离,消息表按月分表
- 使用Nginx做SSL终结和负载均衡
- 配置完善的日志收集和监控告警

