Python异步革命:构建高性能WebSocket实时数据处理系统
一、架构设计
基于ASGI+WebSocket的实时通信系统,支持10万+并发连接,延迟低于50ms
二、核心实现
1. WebSocket服务端
# server.py
import asyncio
from websockets import serve
class RealTimeServer:
def __init__(self):
self.clients = set()
self.message_queue = asyncio.Queue()
async def register(self, websocket):
self.clients.add(websocket)
print(f"新客户端连接,总数: {len(self.clients)}")
async def unregister(self, websocket):
self.clients.remove(websocket)
print(f"客户端断开,剩余: {len(self.clients)}")
async def broadcast(self):
while True:
message = await self.message_queue.get()
for client in self.clients.copy():
try:
await client.send(message)
except:
await self.unregister(client)
async def handler(self, websocket):
await self.register(websocket)
try:
async for message in websocket:
# 处理客户端消息
await self.message_queue.put(message)
finally:
await self.unregister(websocket)
async def start(self, host='0.0.0.0', port=8765):
async with serve(self.handler, host, port):
asyncio.create_task(self.broadcast())
print(f"服务启动在 ws://{host}:{port}")
await asyncio.Future() # 永久运行
if __name__ == "__main__":
server = RealTimeServer()
asyncio.run(server.start())
2. 消息处理器
# processor.py
import json
from datetime import datetime
class MessageProcessor:
@staticmethod
async def process(raw_message):
try:
message = json.loads(raw_message)
message['timestamp'] = datetime.now().isoformat()
# 业务逻辑处理
if message.get('type') == 'chat':
message['processed'] = True
return json.dumps(message)
# 其他消息类型处理...
except Exception as e:
return json.dumps({
'error': str(e),
'original': raw_message
})
三、高级特性
1. 连接负载均衡
# balancer.py
import asyncio
from collections import defaultdict
class ConnectionBalancer:
def __init__(self, max_connections=1000):
self.server_instances = []
self.connection_counts = defaultdict(int)
self.max_connections = max_connections
async def add_server(self, server):
self.server_instances.append(server)
async def get_best_server(self):
# 选择连接数最少的服务器
return min(
self.server_instances,
key=lambda s: self.connection_counts[s]
)
async def track_connection(self, server):
self.connection_counts[server] += 1
if self.connection_counts[server] >= self.max_connections:
await self.scale_out()
async def scale_out(self):
new_server = RealTimeServer()
await self.add_server(new_server)
asyncio.create_task(new_server.start(
port=new_server.find_available_port()
))
2. 消息持久化
# storage.py
import aiofiles
from aiokafka import AIOKafkaProducer
class MessageStorage:
def __init__(self):
self.kafka_producer = None
async def connect_kafka(self):
self.kafka_producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092')
await self.kafka_producer.start()
async def store_message(self, message):
# 写入Kafka
if self.kafka_producer:
await self.kafka_producer.send(
'websocket_messages',
message.encode()
)
# 同时写入本地文件
async with aiofiles.open('messages.log', 'a') as f:
await f.write(f"{message}n")
四、完整案例
# 整合所有组件
async def main():
# 初始化服务器集群
balancer = ConnectionBalancer()
server1 = RealTimeServer()
await balancer.add_server(server1)
# 启动Kafka生产者
storage = MessageStorage()
await storage.connect_kafka()
# 修改处理器以包含存储
async def enhanced_handler(websocket):
await server1.register(websocket)
try:
async for message in websocket:
processed = await MessageProcessor.process(message)
await storage.store_message(processed)
await server1.message_queue.put(processed)
finally:
await server1.unregister(websocket)
# 启动服务
server1.handler = enhanced_handler
await server1.start()
if __name__ == "__main__":
asyncio.run(main())