Python 异步编程与FastAPI实战:构建高并发Web服务

2026-05-02 0 647

const ws = new WebSocket(“ws://localhost:8000/ws/test_user”);
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
const div = document.getElementById(“messages”);
div.innerHTML += “

” + JSON.stringify(data) + “

“;
};
function send() {
const msg = document.getElementById(“msg”).value;
ws.send(JSON.stringify({type: “chat”, message: msg}));
}

“””


6. 实战案例四:异步任务调度与后台任务

使用FastAPI的BackgroundTasks和Celery处理耗时任务。

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import asyncio
import time
from typing import Dict

app = FastAPI()

# 模拟后台任务
async def send_email_async(email: str, content: str):
    """异步发送邮件"""
    print(f"开始发送邮件到 {email}")
    await asyncio.sleep(3)  # 模拟SMTP通信
    print(f"邮件发送完成: {content[:50]}...")

def send_email_sync(email: str, content: str):
    """同步发送邮件(在后台线程中执行)"""
    print(f"开始发送邮件到 {email}")
    time.sleep(3)
    print(f"邮件发送完成: {content[:50]}...")

# 任务状态存储
task_status: Dict[str, str] = {}

@app.post("/send-notification")
async def send_notification(
    email: str, 
    content: str, 
    background_tasks: BackgroundTasks
):
    # 方式1:异步后台任务(推荐)
    background_tasks.add_task(send_email_async, email, content)
    return {"message": "通知发送任务已提交"}

@app.post("/send-notification-sync")
async def send_notification_sync(
    email: str, 
    content: str, 
    background_tasks: BackgroundTasks
):
    # 方式2:同步后台任务(在线程池中运行)
    background_tasks.add_task(send_email_sync, email, content)
    return {"message": "同步通知任务已提交"}

# 更复杂的后台任务管理
class TaskManager:
    def __init__(self):
        self.tasks = {}
        self.task_id = 0
    
    async def run_long_task(self, task_id: int, duration: int):
        task_status[task_id] = "running"
        try:
            for i in range(duration):
                await asyncio.sleep(1)
                task_status[task_id] = f"progress: {i+1}/{duration}"
            task_status[task_id] = "completed"
        except Exception as e:
            task_status[task_id] = f"failed: {str(e)}"

task_manager = TaskManager()

@app.post("/start-task")
async def start_task(duration: int = 5):
    task_id = len(task_status) + 1
    task_status[task_id] = "pending"
    # 在后台运行任务
    asyncio.create_task(task_manager.run_long_task(task_id, duration))
    return {"task_id": task_id, "status": "pending"}

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: int):
    status = task_status.get(task_id, "not_found")
    return {"task_id": task_id, "status": status}

7. 性能对比:同步 vs 异步

指标 同步Flask 异步FastAPI
并发连接数(单进程) 约200 5000+
100个IO请求耗时 约10秒(串行) 约1秒(并发)
CPU密集型任务 阻塞事件循环 需使用线程池
内存占用/连接 约1MB 约2KB
代码复杂度 简单 中等(需要理解异步)

8. 最佳实践总结

  • 使用 async/await:所有IO操作都使用异步库(httpx、asyncpg、aiomysql)
  • 避免阻塞事件循环:CPU密集型任务使用 asyncio.to_thread() 或线程池
  • 连接池管理:数据库连接池、HTTP连接池复用
  • 错误处理:使用try/except捕获异步异常,避免任务静默失败
  • 监控与日志:使用structlog、opentelemetry追踪异步任务
# 避免阻塞事件循环的示例
import asyncio
import hashlib

async def compute_hash(data: bytes) -> str:
    # CPU密集型任务:使用to_thread避免阻塞
    result = await asyncio.to_thread(hashlib.sha256, data)
    return result.hexdigest()

async def main():
    # 并发执行多个CPU密集型任务
    tasks = [
        compute_hash(b"data1" * 1000000),
        compute_hash(b"data2" * 1000000),
        compute_hash(b"data3" * 1000000)
    ]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

9. 总结

通过本文的案例,你掌握了Python异步编程和FastAPI的核心技术:

  • async/await语法与事件循环
  • FastAPI基础API构建
  • 异步数据库操作(SQLAlchemy + databases)
  • WebSocket实时通信
  • 后台任务调度与任务管理
  • 性能对比与最佳实践

Python异步编程让Web服务拥有了媲美Node.js和Go的并发能力。结合FastAPI的自动文档和类型检查,你可以构建出高性能、可维护的现代Web应用。现在就开始你的异步Python之旅吧!


本文原创,基于Python 3.12+和FastAPI 0.110+。所有代码均在Python 3.12环境中测试通过。

Python 异步编程与FastAPI实战:构建高并发Web服务
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python 异步编程与FastAPI实战:构建高并发Web服务 https://www.taomawang.com/server/python/1767.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务