Python异步编程实战:基于FastAPI构建高性能WebSocket实时应用

2025-10-27 0 615

发布日期:2024年1月17日 | 作者:Python高级工程师

一、异步编程基础概念

Python的异步编程模型通过asyncio库提供了强大的并发处理能力,特别适合I/O密集型应用。FastAPI作为现代Python Web框架,原生支持异步操作,结合WebSocket协议可以构建高性能的实时应用程序。

异步编程核心优势:

  • 高并发处理:单线程处理数千个并发连接
  • 资源高效:相比多线程,内存占用大幅降低
  • 实时响应:毫秒级的消息推送延迟
  • 易于扩展:天然支持水平扩展架构

技术栈选择:

核心框架:FastAPI + WebSocket
异步运行时:uvicorn
数据序列化:Pydantic
消息队列:Redis Pub/Sub
数据库:PostgreSQL + asyncpg
缓存:Redis
部署:Docker + Kubernetes

二、系统架构设计

1. 整体架构图

客户端 (Web/Mobile)
    ↓
负载均衡器 (Nginx)
    ↓
FastAPI WebSocket 服务器集群
    ↓
Redis 消息总线 (Pub/Sub)
    ↓
PostgreSQL 数据库 (异步驱动)
    ↓
外部服务集成 (第三方API)

2. 项目结构设计

realtime-app/
├── app/
│   ├── __init__.py
│   ├── main.py                 # 应用入口
│   ├── core/
│   │   ├── config.py           # 配置管理
│   │   └── security.py         # 安全认证
│   ├── api/
│   │   └── websocket.py        # WebSocket路由
│   ├── models/
│   │   ├── message.py          # 消息模型
│   │   └── user.py             # 用户模型
│   ├── services/
│   │   ├── connection_manager.py # 连接管理
│   │   ├── message_service.py  # 消息服务
│   │   └── redis_service.py    # Redis服务
│   └── schemas/
│       ├── message.py          # 消息序列化
│       └── user.py             # 用户序列化
├── requirements.txt
├── Dockerfile
└── docker-compose.yml

3. 数据库设计

-- 用户表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 消息表
CREATE TABLE messages (
    id SERIAL PRIMARY KEY,
    room_id VARCHAR(100) NOT NULL,
    user_id INTEGER REFERENCES users(id),
    content TEXT NOT NULL,
    message_type VARCHAR(20) DEFAULT 'text',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 连接记录表
CREATE TABLE connections (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    connection_id VARCHAR(100) NOT NULL,
    room_id VARCHAR(100) NOT NULL,
    connected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    disconnected_at TIMESTAMP NULL
);

三、核心功能实现

1. 应用配置与启动

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

from app.api import websocket
from app.core.config import settings

app = FastAPI(
    title="实时聊天应用",
    description="基于FastAPI和WebSocket的高性能实时应用",
    version="1.0.0"
)

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_HOSTS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册路由
app.include_router(websocket.router, prefix="/ws", tags=["websocket"])

@app.get("/")
async def root():
    return {"message": "实时聊天服务运行中"}

if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host="0.0.0.0",
        port=8000,
        reload=settings.DEBUG,
        workers=settings.WORKERS
    )

2. 连接管理器

# app/services/connection_manager.py
import json
import asyncio
from typing import Dict, List, Set
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
        self.room_connections: Dict[str, Set[str]] = {}
        self.user_rooms: Dict[str, Set[str]] = {}

    async def connect(self, websocket: WebSocket, connection_id: str):
        await websocket.accept()
        self.active_connections[connection_id] = websocket

    def disconnect(self, connection_id: str):
        if connection_id in self.active_connections:
            del self.active_connections[connection_id]
        
        # 从房间中移除连接
        for room_id, connections in self.room_connections.items():
            if connection_id in connections:
                connections.remove(connection_id)

    async def join_room(self, connection_id: str, room_id: str):
        if room_id not in self.room_connections:
            self.room_connections[room_id] = set()
        self.room_connections[room_id].add(connection_id)
        
        # 记录用户加入的房间
        if connection_id not in self.user_rooms:
            self.user_rooms[connection_id] = set()
        self.user_rooms[connection_id].add(room_id)

    async def leave_room(self, connection_id: str, room_id: str):
        if room_id in self.room_connections:
            self.room_connections[room_id].discard(connection_id)
        
        if connection_id in self.user_rooms:
            self.user_rooms[connection_id].discard(room_id)

    async def send_personal_message(self, message: str, connection_id: str):
        if connection_id in self.active_connections:
            websocket = self.active_connections[connection_id]
            await websocket.send_text(message)

    async def broadcast_to_room(self, message: str, room_id: str, exclude_connections: Set[str] = None):
        if room_id not in self.room_connections:
            return
        
        if exclude_connections is None:
            exclude_connections = set()
        
        disconnected_connections = set()
        
        for connection_id in self.room_connections[room_id]:
            if connection_id in exclude_connections:
                continue
            
            try:
                await self.send_personal_message(message, connection_id)
            except Exception:
                disconnected_connections.add(connection_id)
        
        # 清理断开连接的客户端
        for connection_id in disconnected_connections:
            self.disconnect(connection_id)

    def get_room_users_count(self, room_id: str) -> int:
        return len(self.room_connections.get(room_id, set()))

# 全局连接管理器实例
connection_manager = ConnectionManager()

3. Redis消息服务

# app/services/redis_service.py
import redis.asyncio as redis
import json
import asyncio
from app.core.config import settings

class RedisService:
    def __init__(self):
        self.redis_client = None
    
    async def connect(self):
        self.redis_client = redis.Redis(
            host=settings.REDIS_HOST,
            port=settings.REDIS_PORT,
            password=settings.REDIS_PASSWORD,
            decode_responses=True
        )
    
    async def disconnect(self):
        if self.redis_client:
            await self.redis_client.close()
    
    async def publish_message(self, channel: str, message: dict):
        if self.redis_client:
            await self.redis_client.publish(channel, json.dumps(message))
    
    async def subscribe_channel(self, channel: str, message_handler):
        if not self.redis_client:
            await self.connect()
        
        pubsub = self.redis_client.pubsub()
        await pubsub.subscribe(channel)
        
        async for message in pubsub.listen():
            if message['type'] == 'message':
                data = json.loads(message['data'])
                await message_handler(data)
    
    async def set_user_online(self, user_id: str, connection_data: dict):
        if self.redis_client:
            key = f"user:{user_id}:online"
            await self.redis_client.setex(
                key, 
                settings.USER_ONLINE_TTL, 
                json.dumps(connection_data)
            )
    
    async def get_online_users(self, user_ids: List[str]) -> List[dict]:
        if not self.redis_client:
            return []
        
        online_users = []
        for user_id in user_ids:
            key = f"user:{user_id}:online"
            data = await self.redis_client.get(key)
            if data:
                online_users.append(json.loads(data))
        
        return online_users

# 全局Redis服务实例
redis_service = RedisService()

4. WebSocket路由处理器

# app/api/websocket.py
import json
import uuid
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.services.connection_manager import connection_manager
from app.services.redis_service import redis_service
from app.schemas.message import MessageCreate, MessageResponse

router = APIRouter()

@router.websocket("/chat/{room_id}")
async def websocket_chat_endpoint(websocket: WebSocket, room_id: str):
    connection_id = str(uuid.uuid4())
    
    await connection_manager.connect(websocket, connection_id)
    await connection_manager.join_room(connection_id, room_id)
    
    try:
        # 发送连接成功消息
        welcome_message = {
            "type": "system",
            "content": f"欢迎加入房间 {room_id}",
            "user_count": connection_manager.get_room_users_count(room_id)
        }
        await connection_manager.send_personal_message(
            json.dumps(welcome_message), 
            connection_id
        )
        
        # 广播用户加入消息
        join_message = {
            "type": "user_join",
            "content": "新用户加入房间",
            "user_count": connection_manager.get_room_users_count(room_id)
        }
        await connection_manager.broadcast_to_room(
            json.dumps(join_message), 
            room_id, 
            {connection_id}
        )
        
        # 处理消息循环
        while True:
            data = await websocket.receive_text()
            message_data = json.loads(data)
            
            # 处理不同类型的消息
            message_type = message_data.get("type", "text")
            
            if message_type == "text":
                await handle_text_message(
                    message_data, room_id, connection_id
                )
            elif message_type == "typing":
                await handle_typing_indicator(
                    message_data, room_id, connection_id
                )
                
    except WebSocketDisconnect:
        # 处理连接断开
        await handle_disconnect(connection_id, room_id)

async def handle_text_message(message_data: dict, room_id: str, connection_id: str):
    """处理文本消息"""
    message_content = message_data.get("content", "").strip()
    
    if not message_content:
        return
    
    # 构建消息响应
    message_response = {
        "type": "message",
        "content": message_content,
        "sender": message_data.get("sender", "匿名用户"),
        "timestamp": message_data.get("timestamp"),
        "message_id": str(uuid.uuid4())
    }
    
    # 广播消息到房间
    await connection_manager.broadcast_to_room(
        json.dumps(message_response),
        room_id
    )
    
    # 发布消息到Redis,用于跨服务器广播
    await redis_service.publish_message(
        f"room:{room_id}",
        {
            "type": "message_broadcast",
            "message": message_response,
            "exclude_connection": connection_id
        }
    )

async def handle_typing_indicator(message_data: dict, room_id: str, connection_id: str):
    """处理输入指示器"""
    typing_data = {
        "type": "typing",
        "user": message_data.get("sender"),
        "is_typing": message_data.get("is_typing", False)
    }
    
    # 广播输入状态(排除自己)
    await connection_manager.broadcast_to_room(
        json.dumps(typing_data),
        room_id,
        {connection_id}
    )

async def handle_disconnect(connection_id: str, room_id: str):
    """处理连接断开"""
    # 广播用户离开消息
    leave_message = {
        "type": "user_leave",
        "content": "用户离开房间",
        "user_count": connection_manager.get_room_users_count(room_id) - 1
    }
    
    await connection_manager.broadcast_to_room(
        json.dumps(leave_message),
        room_id,
        {connection_id}
    )
    
    # 从连接管理器中移除
    connection_manager.disconnect(connection_id)

四、高级特性与优化

1. 消息持久化服务

# app/services/message_service.py
import asyncio
from datetime import datetime
from typing import List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

from app.models.message import Message
from app.models.user import User
from app.schemas.message import MessageCreate, MessageResponse

class MessageService:
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def create_message(
        self, 
        message_data: MessageCreate,
        user_id: int
    ) -> MessageResponse:
        """创建并保存消息"""
        message = Message(
            room_id=message_data.room_id,
            user_id=user_id,
            content=message_data.content,
            message_type=message_data.message_type
        )
        
        self.db.add(message)
        await self.db.commit()
        await self.db.refresh(message)
        
        # 获取用户信息
        user = await self.db.get(User, user_id)
        
        return MessageResponse(
            id=message.id,
            content=message.content,
            message_type=message.message_type,
            room_id=message.room_id,
            user_id=user_id,
            username=user.username,
            created_at=message.created_at
        )
    
    async def get_room_messages(
        self, 
        room_id: str, 
        limit: int = 50,
        offset: int = 0
    ) -> List[MessageResponse]:
        """获取房间消息历史"""
        query = (
            select(Message, User)
            .join(User, Message.user_id == User.id)
            .where(Message.room_id == room_id)
            .order_by(Message.created_at.desc())
            .limit(limit)
            .offset(offset)
        )
        
        result = await self.db.execute(query)
        messages = []
        
        for message, user in result.all():
            messages.append(MessageResponse(
                id=message.id,
                content=message.content,
                message_type=message.message_type,
                room_id=message.room_id,
                user_id=user.id,
                username=user.username,
                created_at=message.created_at
            ))
        
        return list(reversed(messages))

2. 性能优化配置

# app/core/config.py
from pydantic_settings import BaseSettings
from typing import List

class Settings(BaseSettings):
    # 应用配置
    APP_NAME: str = "实时聊天应用"
    DEBUG: bool = False
    WORKERS: int = 4
    
    # 数据库配置
    DATABASE_URL: str = "postgresql+asyncpg://user:password@localhost/realtime_chat"
    
    # Redis配置
    REDIS_HOST: str = "localhost"
    REDIS_PORT: int = 6379
    REDIS_PASSWORD: str = ""
    
    # WebSocket配置
    WEBSOCKET_PING_INTERVAL: int = 20
    WEBSOCKET_PING_TIMEOUT: int = 30
    WEBSOCKET_MAX_SIZE: int = 1024 * 1024  # 1MB
    
    # 安全配置
    ALLOWED_HOSTS: List[str] = ["*"]
    USER_ONLINE_TTL: int = 300  # 5分钟
    
    class Config:
        env_file = ".env"

settings = Settings()

五、部署与性能测试

1. Docker容器化部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

# docker-compose.yml
version: '3.8'
services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://user:password@db/realtime_chat
      - REDIS_HOST=redis
    depends_on:
      - db
      - redis
  
  db:
    image: postgres:13
    environment:
      - POSTGRES_DB=realtime_chat
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
  
  redis:
    image: redis:6-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data

volumes:
  postgres_data:
  redis_data:

2. 性能测试脚本

# tests/performance_test.py
import asyncio
import websockets
import json
import time
import statistics

class PerformanceTester:
    def __init__(self, server_url: str, num_clients: int = 100):
        self.server_url = server_url
        self.num_clients = num_clients
        self.latencies = []
    
    async def simulate_client(self, client_id: int):
        """模拟单个客户端行为"""
        try:
            start_time = time.time()
            
            async with websockets.connect(
                f"{self.server_url}/ws/chat/test-room"
            ) as websocket:
                connect_time = time.time() - start_time
                self.latencies.append(connect_time)
                
                # 发送测试消息
                message = {
                    "type": "text",
                    "content": f"测试消息来自客户端 {client_id}",
                    "sender": f"user_{client_id}"
                }
                
                send_start = time.time()
                await websocket.send(json.dumps(message))
                
                # 等待响应
                response = await websocket.recv()
                round_trip_time = time.time() - send_start
                self.latencies.append(round_trip_time)
                
                await asyncio.sleep(1)
                
        except Exception as e:
            print(f"客户端 {client_id} 错误: {e}")
    
    async def run_test(self):
        """运行性能测试"""
        print(f"开始性能测试,模拟 {self.num_clients} 个客户端...")
        
        start_time = time.time()
        
        # 并发运行所有客户端
        tasks = [
            self.simulate_client(i) 
            for i in range(self.num_clients)
        ]
        await asyncio.gather(*tasks, return_exceptions=True)
        
        total_time = time.time() - start_time
        
        # 输出测试结果
        print(f"n=== 性能测试结果 ===")
        print(f"总客户端数: {self.num_clients}")
        print(f"总测试时间: {total_time:.2f} 秒")
        print(f"平均延迟: {statistics.mean(self.latencies):.3f} 秒")
        print(f"最大延迟: {max(self.latencies):.3f} 秒")
        print(f"最小延迟: {min(self.latencies):.3f} 秒")
        print(f"吞吐量: {self.num_clients / total_time:.2f} 客户端/秒")

# 运行测试
async def main():
    tester = PerformanceTester("ws://localhost:8000", num_clients=100)
    await tester.run_test()

if __name__ == "__main__":
    asyncio.run(main())

3. 监控与日志

# app/core/monitoring.py
import time
import logging
from contextlib import asynccontextmanager
from prometheus_client import Counter, Histogram, generate_latest

# 定义指标
websocket_connections = Counter(
    'websocket_connections_total',
    'WebSocket连接总数',
    ['room_id']
)

messages_sent = Counter(
    'messages_sent_total',
    '发送消息总数',
    ['room_id', 'message_type']
)

message_processing_time = Histogram(
    'message_processing_seconds',
    '消息处理时间',
    ['message_type']
)

@asynccontextmanager
async def track_message_processing(message_type: str):
    start_time = time.time()
    try:
        yield
    finally:
        processing_time = time.time() - start_time
        message_processing_time.labels(
            message_type=message_type
        ).observe(processing_time)

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger("realtime_app")

Python异步编程实战:基于FastAPI构建高性能WebSocket实时应用
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python异步编程实战:基于FastAPI构建高性能WebSocket实时应用 https://www.taomawang.com/server/python/1305.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务