发布日期:2024年1月 | 作者:Python高级工程师
实时数据推送系统架构设计
在现代Web应用中,实时数据推送已成为标配功能。传统轮询方式存在性能瓶颈,而WebSocket提供了真正的全双工通信能力。
系统架构组件
- WebSocket服务器:基于asyncio的高并发连接管理
- 消息广播系统:支持房间、分组和定向消息推送
- 连接状态管理:心跳检测、断线重连和会话保持
- 数据持久化:消息历史记录和离线消息处理
- 监控统计:连接数、消息量和性能指标监控
Python异步编程核心概念
异步函数定义与执行
import asyncio
import websockets
import json
from datetime import datetime
class AsyncWebSocketServer:
def __init__(self):
self.connections = set()
self.room_connections = {}
self.user_connections = {}
async def handle_connection(self, websocket, path):
"""处理WebSocket连接"""
# 注册连接
self.connections.add(websocket)
try:
# 处理客户端消息
async for message in websocket:
await self.process_message(websocket, message)
except websockets.exceptions.ConnectionClosed:
print("连接已关闭")
finally:
# 清理连接
await self.cleanup_connection(websocket)
异步上下文管理器
class ConnectionManager:
def __init__(self):
self._lock = asyncio.Lock()
self._connections = {}
async def __aenter__(self):
await self._lock.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._lock.release()
async def add_connection(self, user_id, websocket):
"""添加用户连接"""
async with self:
if user_id not in self._connections:
self._connections[user_id] = set()
self._connections[user_id].add(websocket)
async def remove_connection(self, user_id, websocket):
"""移除用户连接"""
async with self:
if user_id in self._connections:
self._connections[user_id].discard(websocket)
if not self._connections[user_id]:
del self._connections[user_id]
完整系统实现
1. WebSocket服务器核心
import asyncio
import websockets
import json
import logging
from typing import Set, Dict, Any
import redis.asyncio as redis
class RealTimeWebSocketServer:
def __init__(self, host='localhost', port=8765):
self.host = host
self.port = port
self.connections: Set[websockets.WebSocketServerProtocol] = set()
self.rooms: Dict[str, Set[websockets.WebSocketServerProtocol]] = {}
self.users: Dict[str, websockets.WebSocketServerProtocol] = {}
self.redis = None
# 设置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def initialize_redis(self):
"""初始化Redis连接"""
self.redis = await redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
async def broadcast_to_room(self, room: str, message: Dict[str, Any]):
"""向房间广播消息"""
if room in self.rooms and self.rooms[room]:
message_json = json.dumps(message)
disconnected = set()
# 并发发送消息
tasks = []
for connection in self.rooms[room]:
try:
task = asyncio.create_task(
connection.send(message_json)
)
tasks.append(task)
except websockets.exceptions.ConnectionClosed:
disconnected.add(connection)
# 等待所有发送任务完成
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
# 清理断开连接的客户端
for connection in disconnected:
await self.leave_room(room, connection)
async def join_room(self, room: str, websocket: websockets.WebSocketServerProtocol):
"""加入房间"""
if room not in self.rooms:
self.rooms[room] = set()
self.rooms[room].add(websocket)
# 记录加入事件
join_message = {
'type': 'system',
'action': 'user_joined',
'room': room,
'timestamp': datetime.now().isoformat(),
'online_count': len(self.rooms[room])
}
await self.broadcast_to_room(room, join_message)
async def leave_room(self, room: str, websocket: websockets.WebSocketServerProtocol):
"""离开房间"""
if room in self.rooms and websocket in self.rooms[room]:
self.rooms[room].discard(websocket)
# 如果房间为空,清理房间
if not self.rooms[room]:
del self.rooms[room]
else:
# 广播离开消息
leave_message = {
'type': 'system',
'action': 'user_left',
'room': room,
'timestamp': datetime.now().isoformat(),
'online_count': len(self.rooms[room])
}
await self.broadcast_to_room(room, leave_message)
async def handle_message(self, websocket: websockets.WebSocketServerProtocol, message: str):
"""处理客户端消息"""
try:
data = json.loads(message)
message_type = data.get('type')
if message_type == 'join_room':
room = data.get('room')
user_id = data.get('user_id')
await self.join_room(room, websocket)
self.users[user_id] = websocket
elif message_type == 'chat_message':
room = data.get('room')
content = data.get('content')
user_id = data.get('user_id')
chat_message = {
'type': 'chat_message',
'room': room,
'content': content,
'user_id': user_id,
'timestamp': datetime.now().isoformat()
}
# 广播聊天消息
await self.broadcast_to_room(room, chat_message)
# 保存到Redis
if self.redis:
await self.redis.rpush(
f"chat_history:{room}",
json.dumps(chat_message)
)
# 只保留最近100条消息
await self.redis.ltrim(f"chat_history:{room}", 0, 99)
elif message_type == 'heartbeat':
# 处理心跳包
await websocket.send(json.dumps({
'type': 'heartbeat_ack',
'timestamp': datetime.now().isoformat()
}))
except json.JSONDecodeError:
self.logger.error("消息格式错误")
except Exception as e:
self.logger.error(f"处理消息时出错: {e}")
async def connection_handler(self, websocket: websockets.WebSocketServerProtocol, path: str):
"""连接处理主循环"""
self.connections.add(websocket)
self.logger.info(f"新连接建立,总连接数: {len(self.connections)}")
try:
# 发送欢迎消息
welcome_msg = {
'type': 'system',
'message': '连接成功',
'timestamp': datetime.now().isoformat()
}
await websocket.send(json.dumps(welcome_msg))
# 处理消息循环
async for message in websocket:
await self.handle_message(websocket, message)
except websockets.exceptions.ConnectionClosed:
self.logger.info("连接正常关闭")
except Exception as e:
self.logger.error(f"连接处理异常: {e}")
finally:
# 清理连接
await self.cleanup_connection(websocket)
async def cleanup_connection(self, websocket: websockets.WebSocketServerProtocol):
"""清理连接资源"""
self.connections.discard(websocket)
# 从所有房间中移除
for room in list(self.rooms.keys()):
await self.leave_room(room, websocket)
# 从用户映射中移除
for user_id, conn in list(self.users.items()):
if conn == websocket:
del self.users[user_id]
self.logger.info(f"连接清理完成,剩余连接数: {len(self.connections)}")
async def start_server(self):
"""启动WebSocket服务器"""
await self.initialize_redis()
self.logger.info(f"启动WebSocket服务器在 {self.host}:{self.port}")
async with websockets.serve(
self.connection_handler,
self.host,
self.port,
ping_interval=20,
ping_timeout=60,
close_timeout=10
):
# 启动后台任务
background_tasks = [
asyncio.create_task(self.monitor_connections()),
asyncio.create_task(self.cleanup_inactive_connections())
]
await asyncio.Future() # 永久运行
async def monitor_connections(self):
"""监控连接状态"""
while True:
await asyncio.sleep(30)
self.logger.info(
f"连接统计 - 总连接: {len(self.connections)}, "
f"房间数: {len(self.rooms)}, "
f"用户数: {len(self.users)}"
)
async def cleanup_inactive_connections(self):
"""清理非活跃连接"""
while True:
await asyncio.sleep(60)
# 这里可以实现更复杂的心跳检测逻辑
2. 客户端测试代码
import asyncio
import websockets
import json
import random
class WebSocketClient:
def __init__(self, user_id, server_url="ws://localhost:8765"):
self.user_id = user_id
self.server_url = server_url
self.websocket = None
async def connect(self):
"""连接到WebSocket服务器"""
self.websocket = await websockets.connect(self.server_url)
print(f"用户 {self.user_id} 连接成功")
async def join_room(self, room):
"""加入房间"""
message = {
'type': 'join_room',
'room': room,
'user_id': self.user_id
}
await self.websocket.send(json.dumps(message))
print(f"用户 {self.user_id} 加入房间 {room}")
async def send_message(self, room, content):
"""发送消息"""
message = {
'type': 'chat_message',
'room': room,
'content': content,
'user_id': self.user_id
}
await self.websocket.send(json.dumps(message))
print(f"用户 {self.user_id} 发送消息: {content}")
async def listen_messages(self):
"""监听服务器消息"""
try:
async for message in self.websocket:
data = json.loads(message)
print(f"用户 {self.user_id} 收到消息: {data}")
except websockets.exceptions.ConnectionClosed:
print("连接已关闭")
async def heartbeat(self):
"""发送心跳包"""
while True:
await asyncio.sleep(25)
if self.websocket:
try:
message = {
'type': 'heartbeat',
'user_id': self.user_id
}
await self.websocket.send(json.dumps(message))
except:
break
async def simulate_multiple_clients():
"""模拟多个客户端同时连接"""
clients = []
room = "test_room"
# 创建10个客户端
for i in range(10):
client = WebSocketClient(f"user_{i}")
clients.append(client)
# 并发连接
await asyncio.gather(*[client.connect() for client in clients])
# 全部加入房间
await asyncio.gather(*[client.join_room(room) for client in clients])
# 模拟消息发送
tasks = []
for client in clients:
# 每个客户端发送5条消息
for j in range(5):
content = f"这是来自 {client.user_id} 的第 {j+1} 条消息"
task = asyncio.create_task(
client.send_message(room, content)
)
tasks.append(task)
await asyncio.sleep(0.1) # 稍微延迟
# 等待所有消息发送完成
await asyncio.gather(*tasks)
# 保持连接一段时间
await asyncio.sleep(10)
# 关闭所有连接
for client in clients:
await client.websocket.close()
if __name__ == "__main__":
# 启动服务器
server = RealTimeWebSocketServer()
# 在单独线程中运行服务器
async def run_system():
server_task = asyncio.create_task(server.start_server())
# 等待服务器启动
await asyncio.sleep(2)
# 运行客户端测试
await simulate_multiple_clients()
# 取消服务器任务
server_task.cancel()
asyncio.run(run_system())
高级特性实现
1. 消息队列集成
import asyncio
from aioredis import Redis
from typing import Callable, Any
import json
class MessageQueueManager:
def __init__(self, redis_client: Redis):
self.redis = redis_client
self.handlers = {}
def register_handler(self, message_type: str, handler: Callable):
"""注册消息处理器"""
self.handlers[message_type] = handler
async def publish_message(self, channel: str, message: Dict[str, Any]):
"""发布消息到频道"""
await self.redis.publish(channel, json.dumps(message))
async def subscribe_channel(self, channel: str):
"""订阅频道并处理消息"""
pubsub = self.redis.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
await self.process_incoming_message(data)
async def process_incoming_message(self, message: Dict[str, Any]):
"""处理接收到的消息"""
message_type = message.get('type')
if message_type in self.handlers:
await self.handlers[message_type](message)
2. 连接负载监控
import psutil
import asyncio
from dataclasses import dataclass
from typing import Dict
import time
@dataclass
class SystemMetrics:
connection_count: int
memory_usage: float
cpu_usage: float
messages_per_second: int
class PerformanceMonitor:
def __init__(self, server: RealTimeWebSocketServer):
self.server = server
self.metrics_history = []
self.message_count = 0
self.last_check_time = time.time()
async def collect_metrics(self) -> SystemMetrics:
"""收集系统指标"""
current_time = time.time()
time_diff = current_time - self.last_check_time
messages_per_second = (
self.message_count / time_diff if time_diff > 0 else 0
)
metrics = SystemMetrics(
connection_count=len(self.server.connections),
memory_usage=psutil.virtual_memory().percent,
cpu_usage=psutil.cpu_percent(interval=1),
messages_per_second=int(messages_per_second)
)
# 重置计数器
self.message_count = 0
self.last_check_time = current_time
return metrics
async def start_monitoring(self):
"""启动监控循环"""
while True:
await asyncio.sleep(10) # 每10秒收集一次
metrics = await self.collect_metrics()
self.metrics_history.append(metrics)
# 只保留最近100条记录
if len(self.metrics_history) > 100:
self.metrics_history.pop(0)
# 打印监控信息
print(f"系统监控 - 连接数: {metrics.connection_count}, "
f"内存使用: {metrics.memory_usage}%, "
f"CPU使用: {metrics.cpu_usage}%, "
f"消息/秒: {metrics.messages_per_second}")
性能优化与最佳实践
连接池管理
from asyncio import Queue
from typing import Set
import asyncio
class ConnectionPool:
def __init__(self, max_size=10000):
self.max_size = max_size
self.active_connections: Set[websockets.WebSocketServerProtocol] = set()
self._lock = asyncio.Lock()
async def add_connection(self, connection):
"""添加连接到连接池"""
async with self._lock:
if len(self.active_connections) >= self.max_size:
raise Exception("连接池已满")
self.active_connections.add(connection)
async def remove_connection(self, connection):
"""从连接池移除连接"""
async with self._lock:
self.active_connections.discard(connection)
async def broadcast(self, message: str, exclude: Set = None):
"""向所有连接广播消息"""
if exclude is None:
exclude = set()
async with self._lock:
tasks = []
for connection in self.active_connections:
if connection not in exclude:
try:
task = asyncio.create_task(
connection.send(message)
)
tasks.append(task)
except:
# 移除无效连接
await self.remove_connection(connection)
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
内存优化建议
- 使用连接池管理WebSocket连接
- 实现消息压缩减少网络传输
- 定期清理无效连接和过期数据
- 使用异步数据库操作避免阻塞
- 实现消息批处理减少IO操作
document.addEventListener(‘DOMContentLoaded’, function() {
const codeBlocks = document.querySelectorAll(‘pre code’);
codeBlocks.forEach(block => {
block.addEventListener(‘click’, function() {
const textArea = document.createElement(‘textarea’);
textArea.value = this.textContent;
document.body.appendChild(textArea);
textArea.select();
try {
document.execCommand(‘copy’);
console.log(‘代码已复制到剪贴板’);
} catch (err) {
console.error(‘复制失败:’, err);
}
document.body.removeChild(textArea);
});
block.title = ‘点击复制代码’;
});
});