Python异步编程深度实战:构建高性能WebSocket实时数据推送系统

2025-09-29 0 484

发布日期:2024年1月 | 作者:Python高级工程师

实时数据推送系统架构设计

在现代Web应用中,实时数据推送已成为标配功能。传统轮询方式存在性能瓶颈,而WebSocket提供了真正的全双工通信能力。

系统架构组件

  • WebSocket服务器:基于asyncio的高并发连接管理
  • 消息广播系统:支持房间、分组和定向消息推送
  • 连接状态管理:心跳检测、断线重连和会话保持
  • 数据持久化:消息历史记录和离线消息处理
  • 监控统计:连接数、消息量和性能指标监控

Python异步编程核心概念

异步函数定义与执行

import asyncio
import websockets
import json
from datetime import datetime

class AsyncWebSocketServer:
    def __init__(self):
        self.connections = set()
        self.room_connections = {}
        self.user_connections = {}
        
    async def handle_connection(self, websocket, path):
        """处理WebSocket连接"""
        # 注册连接
        self.connections.add(websocket)
        
        try:
            # 处理客户端消息
            async for message in websocket:
                await self.process_message(websocket, message)
                
        except websockets.exceptions.ConnectionClosed:
            print("连接已关闭")
        finally:
            # 清理连接
            await self.cleanup_connection(websocket)

异步上下文管理器

class ConnectionManager:
    def __init__(self):
        self._lock = asyncio.Lock()
        self._connections = {}
        
    async def __aenter__(self):
        await self._lock.acquire()
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._lock.release()
        
    async def add_connection(self, user_id, websocket):
        """添加用户连接"""
        async with self:
            if user_id not in self._connections:
                self._connections[user_id] = set()
            self._connections[user_id].add(websocket)
            
    async def remove_connection(self, user_id, websocket):
        """移除用户连接"""
        async with self:
            if user_id in self._connections:
                self._connections[user_id].discard(websocket)
                if not self._connections[user_id]:
                    del self._connections[user_id]

完整系统实现

1. WebSocket服务器核心

import asyncio
import websockets
import json
import logging
from typing import Set, Dict, Any
import redis.asyncio as redis

class RealTimeWebSocketServer:
    def __init__(self, host='localhost', port=8765):
        self.host = host
        self.port = port
        self.connections: Set[websockets.WebSocketServerProtocol] = set()
        self.rooms: Dict[str, Set[websockets.WebSocketServerProtocol]] = {}
        self.users: Dict[str, websockets.WebSocketServerProtocol] = {}
        self.redis = None
        
        # 设置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
    async def initialize_redis(self):
        """初始化Redis连接"""
        self.redis = await redis.Redis(
            host='localhost', 
            port=6379, 
            db=0, 
            decode_responses=True
        )
        
    async def broadcast_to_room(self, room: str, message: Dict[str, Any]):
        """向房间广播消息"""
        if room in self.rooms and self.rooms[room]:
            message_json = json.dumps(message)
            disconnected = set()
            
            # 并发发送消息
            tasks = []
            for connection in self.rooms[room]:
                try:
                    task = asyncio.create_task(
                        connection.send(message_json)
                    )
                    tasks.append(task)
                except websockets.exceptions.ConnectionClosed:
                    disconnected.add(connection)
                    
            # 等待所有发送任务完成
            if tasks:
                await asyncio.gather(*tasks, return_exceptions=True)
                
            # 清理断开连接的客户端
            for connection in disconnected:
                await self.leave_room(room, connection)
                
    async def join_room(self, room: str, websocket: websockets.WebSocketServerProtocol):
        """加入房间"""
        if room not in self.rooms:
            self.rooms[room] = set()
        self.rooms[room].add(websocket)
        
        # 记录加入事件
        join_message = {
            'type': 'system',
            'action': 'user_joined',
            'room': room,
            'timestamp': datetime.now().isoformat(),
            'online_count': len(self.rooms[room])
        }
        await self.broadcast_to_room(room, join_message)
        
    async def leave_room(self, room: str, websocket: websockets.WebSocketServerProtocol):
        """离开房间"""
        if room in self.rooms and websocket in self.rooms[room]:
            self.rooms[room].discard(websocket)
            
            # 如果房间为空,清理房间
            if not self.rooms[room]:
                del self.rooms[room]
            else:
                # 广播离开消息
                leave_message = {
                    'type': 'system',
                    'action': 'user_left',
                    'room': room,
                    'timestamp': datetime.now().isoformat(),
                    'online_count': len(self.rooms[room])
                }
                await self.broadcast_to_room(room, leave_message)
                
    async def handle_message(self, websocket: websockets.WebSocketServerProtocol, message: str):
        """处理客户端消息"""
        try:
            data = json.loads(message)
            message_type = data.get('type')
            
            if message_type == 'join_room':
                room = data.get('room')
                user_id = data.get('user_id')
                await self.join_room(room, websocket)
                self.users[user_id] = websocket
                
            elif message_type == 'chat_message':
                room = data.get('room')
                content = data.get('content')
                user_id = data.get('user_id')
                
                chat_message = {
                    'type': 'chat_message',
                    'room': room,
                    'content': content,
                    'user_id': user_id,
                    'timestamp': datetime.now().isoformat()
                }
                
                # 广播聊天消息
                await self.broadcast_to_room(room, chat_message)
                
                # 保存到Redis
                if self.redis:
                    await self.redis.rpush(
                        f"chat_history:{room}", 
                        json.dumps(chat_message)
                    )
                    # 只保留最近100条消息
                    await self.redis.ltrim(f"chat_history:{room}", 0, 99)
                    
            elif message_type == 'heartbeat':
                # 处理心跳包
                await websocket.send(json.dumps({
                    'type': 'heartbeat_ack',
                    'timestamp': datetime.now().isoformat()
                }))
                
        except json.JSONDecodeError:
            self.logger.error("消息格式错误")
        except Exception as e:
            self.logger.error(f"处理消息时出错: {e}")
            
    async def connection_handler(self, websocket: websockets.WebSocketServerProtocol, path: str):
        """连接处理主循环"""
        self.connections.add(websocket)
        self.logger.info(f"新连接建立,总连接数: {len(self.connections)}")
        
        try:
            # 发送欢迎消息
            welcome_msg = {
                'type': 'system',
                'message': '连接成功',
                'timestamp': datetime.now().isoformat()
            }
            await websocket.send(json.dumps(welcome_msg))
            
            # 处理消息循环
            async for message in websocket:
                await self.handle_message(websocket, message)
                
        except websockets.exceptions.ConnectionClosed:
            self.logger.info("连接正常关闭")
        except Exception as e:
            self.logger.error(f"连接处理异常: {e}")
        finally:
            # 清理连接
            await self.cleanup_connection(websocket)
            
    async def cleanup_connection(self, websocket: websockets.WebSocketServerProtocol):
        """清理连接资源"""
        self.connections.discard(websocket)
        
        # 从所有房间中移除
        for room in list(self.rooms.keys()):
            await self.leave_room(room, websocket)
            
        # 从用户映射中移除
        for user_id, conn in list(self.users.items()):
            if conn == websocket:
                del self.users[user_id]
                
        self.logger.info(f"连接清理完成,剩余连接数: {len(self.connections)}")
        
    async def start_server(self):
        """启动WebSocket服务器"""
        await self.initialize_redis()
        
        self.logger.info(f"启动WebSocket服务器在 {self.host}:{self.port}")
        
        async with websockets.serve(
            self.connection_handler, 
            self.host, 
            self.port,
            ping_interval=20,
            ping_timeout=60,
            close_timeout=10
        ):
            # 启动后台任务
            background_tasks = [
                asyncio.create_task(self.monitor_connections()),
                asyncio.create_task(self.cleanup_inactive_connections())
            ]
            
            await asyncio.Future()  # 永久运行
            
    async def monitor_connections(self):
        """监控连接状态"""
        while True:
            await asyncio.sleep(30)
            self.logger.info(
                f"连接统计 - 总连接: {len(self.connections)}, "
                f"房间数: {len(self.rooms)}, "
                f"用户数: {len(self.users)}"
            )
            
    async def cleanup_inactive_connections(self):
        """清理非活跃连接"""
        while True:
            await asyncio.sleep(60)
            # 这里可以实现更复杂的心跳检测逻辑

2. 客户端测试代码

import asyncio
import websockets
import json
import random

class WebSocketClient:
    def __init__(self, user_id, server_url="ws://localhost:8765"):
        self.user_id = user_id
        self.server_url = server_url
        self.websocket = None
        
    async def connect(self):
        """连接到WebSocket服务器"""
        self.websocket = await websockets.connect(self.server_url)
        print(f"用户 {self.user_id} 连接成功")
        
    async def join_room(self, room):
        """加入房间"""
        message = {
            'type': 'join_room',
            'room': room,
            'user_id': self.user_id
        }
        await self.websocket.send(json.dumps(message))
        print(f"用户 {self.user_id} 加入房间 {room}")
        
    async def send_message(self, room, content):
        """发送消息"""
        message = {
            'type': 'chat_message',
            'room': room,
            'content': content,
            'user_id': self.user_id
        }
        await self.websocket.send(json.dumps(message))
        print(f"用户 {self.user_id} 发送消息: {content}")
        
    async def listen_messages(self):
        """监听服务器消息"""
        try:
            async for message in self.websocket:
                data = json.loads(message)
                print(f"用户 {self.user_id} 收到消息: {data}")
        except websockets.exceptions.ConnectionClosed:
            print("连接已关闭")
            
    async def heartbeat(self):
        """发送心跳包"""
        while True:
            await asyncio.sleep(25)
            if self.websocket:
                try:
                    message = {
                        'type': 'heartbeat',
                        'user_id': self.user_id
                    }
                    await self.websocket.send(json.dumps(message))
                except:
                    break

async def simulate_multiple_clients():
    """模拟多个客户端同时连接"""
    clients = []
    room = "test_room"
    
    # 创建10个客户端
    for i in range(10):
        client = WebSocketClient(f"user_{i}")
        clients.append(client)
        
    # 并发连接
    await asyncio.gather(*[client.connect() for client in clients])
    
    # 全部加入房间
    await asyncio.gather(*[client.join_room(room) for client in clients])
    
    # 模拟消息发送
    tasks = []
    for client in clients:
        # 每个客户端发送5条消息
        for j in range(5):
            content = f"这是来自 {client.user_id} 的第 {j+1} 条消息"
            task = asyncio.create_task(
                client.send_message(room, content)
            )
            tasks.append(task)
            await asyncio.sleep(0.1)  # 稍微延迟
            
    # 等待所有消息发送完成
    await asyncio.gather(*tasks)
    
    # 保持连接一段时间
    await asyncio.sleep(10)
    
    # 关闭所有连接
    for client in clients:
        await client.websocket.close()

if __name__ == "__main__":
    # 启动服务器
    server = RealTimeWebSocketServer()
    
    # 在单独线程中运行服务器
    async def run_system():
        server_task = asyncio.create_task(server.start_server())
        
        # 等待服务器启动
        await asyncio.sleep(2)
        
        # 运行客户端测试
        await simulate_multiple_clients()
        
        # 取消服务器任务
        server_task.cancel()
        
    asyncio.run(run_system())

高级特性实现

1. 消息队列集成

import asyncio
from aioredis import Redis
from typing import Callable, Any
import json

class MessageQueueManager:
    def __init__(self, redis_client: Redis):
        self.redis = redis_client
        self.handlers = {}
        
    def register_handler(self, message_type: str, handler: Callable):
        """注册消息处理器"""
        self.handlers[message_type] = handler
        
    async def publish_message(self, channel: str, message: Dict[str, Any]):
        """发布消息到频道"""
        await self.redis.publish(channel, json.dumps(message))
        
    async def subscribe_channel(self, channel: str):
        """订阅频道并处理消息"""
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(channel)
        
        async for message in pubsub.listen():
            if message['type'] == 'message':
                data = json.loads(message['data'])
                await self.process_incoming_message(data)
                
    async def process_incoming_message(self, message: Dict[str, Any]):
        """处理接收到的消息"""
        message_type = message.get('type')
        if message_type in self.handlers:
            await self.handlers[message_type](message)

2. 连接负载监控

import psutil
import asyncio
from dataclasses import dataclass
from typing import Dict
import time

@dataclass
class SystemMetrics:
    connection_count: int
    memory_usage: float
    cpu_usage: float
    messages_per_second: int
    
class PerformanceMonitor:
    def __init__(self, server: RealTimeWebSocketServer):
        self.server = server
        self.metrics_history = []
        self.message_count = 0
        self.last_check_time = time.time()
        
    async def collect_metrics(self) -> SystemMetrics:
        """收集系统指标"""
        current_time = time.time()
        time_diff = current_time - self.last_check_time
        
        messages_per_second = (
            self.message_count / time_diff if time_diff > 0 else 0
        )
        
        metrics = SystemMetrics(
            connection_count=len(self.server.connections),
            memory_usage=psutil.virtual_memory().percent,
            cpu_usage=psutil.cpu_percent(interval=1),
            messages_per_second=int(messages_per_second)
        )
        
        # 重置计数器
        self.message_count = 0
        self.last_check_time = current_time
        
        return metrics
        
    async def start_monitoring(self):
        """启动监控循环"""
        while True:
            await asyncio.sleep(10)  # 每10秒收集一次
            
            metrics = await self.collect_metrics()
            self.metrics_history.append(metrics)
            
            # 只保留最近100条记录
            if len(self.metrics_history) > 100:
                self.metrics_history.pop(0)
                
            # 打印监控信息
            print(f"系统监控 - 连接数: {metrics.connection_count}, "
                  f"内存使用: {metrics.memory_usage}%, "
                  f"CPU使用: {metrics.cpu_usage}%, "
                  f"消息/秒: {metrics.messages_per_second}")

性能优化与最佳实践

连接池管理

from asyncio import Queue
from typing import Set
import asyncio

class ConnectionPool:
    def __init__(self, max_size=10000):
        self.max_size = max_size
        self.active_connections: Set[websockets.WebSocketServerProtocol] = set()
        self._lock = asyncio.Lock()
        
    async def add_connection(self, connection):
        """添加连接到连接池"""
        async with self._lock:
            if len(self.active_connections) >= self.max_size:
                raise Exception("连接池已满")
            self.active_connections.add(connection)
            
    async def remove_connection(self, connection):
        """从连接池移除连接"""
        async with self._lock:
            self.active_connections.discard(connection)
            
    async def broadcast(self, message: str, exclude: Set = None):
        """向所有连接广播消息"""
        if exclude is None:
            exclude = set()
            
        async with self._lock:
            tasks = []
            for connection in self.active_connections:
                if connection not in exclude:
                    try:
                        task = asyncio.create_task(
                            connection.send(message)
                        )
                        tasks.append(task)
                    except:
                        # 移除无效连接
                        await self.remove_connection(connection)
                        
            if tasks:
                await asyncio.gather(*tasks, return_exceptions=True)

内存优化建议

  • 使用连接池管理WebSocket连接
  • 实现消息压缩减少网络传输
  • 定期清理无效连接和过期数据
  • 使用异步数据库操作避免阻塞
  • 实现消息批处理减少IO操作

document.addEventListener(‘DOMContentLoaded’, function() {
const codeBlocks = document.querySelectorAll(‘pre code’);

codeBlocks.forEach(block => {
block.addEventListener(‘click’, function() {
const textArea = document.createElement(‘textarea’);
textArea.value = this.textContent;
document.body.appendChild(textArea);
textArea.select();

try {
document.execCommand(‘copy’);
console.log(‘代码已复制到剪贴板’);
} catch (err) {
console.error(‘复制失败:’, err);
}

document.body.removeChild(textArea);
});

block.title = ‘点击复制代码’;
});
});

Python异步编程深度实战:构建高性能WebSocket实时数据推送系统
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python异步编程深度实战:构建高性能WebSocket实时数据推送系统 https://www.taomawang.com/server/python/1137.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务