const ws = new WebSocket(“ws://localhost:8000/ws/test_user”);
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
const div = document.getElementById(“messages”);
div.innerHTML += “
” + JSON.stringify(data) + “
“;
};
function send() {
const msg = document.getElementById(“msg”).value;
ws.send(JSON.stringify({type: “chat”, message: msg}));
}
“””
6. 实战案例四:异步任务调度与后台任务
使用FastAPI的BackgroundTasks和Celery处理耗时任务。
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import asyncio
import time
from typing import Dict
app = FastAPI()
# 模拟后台任务
async def send_email_async(email: str, content: str):
"""异步发送邮件"""
print(f"开始发送邮件到 {email}")
await asyncio.sleep(3) # 模拟SMTP通信
print(f"邮件发送完成: {content[:50]}...")
def send_email_sync(email: str, content: str):
"""同步发送邮件(在后台线程中执行)"""
print(f"开始发送邮件到 {email}")
time.sleep(3)
print(f"邮件发送完成: {content[:50]}...")
# 任务状态存储
task_status: Dict[str, str] = {}
@app.post("/send-notification")
async def send_notification(
email: str,
content: str,
background_tasks: BackgroundTasks
):
# 方式1:异步后台任务(推荐)
background_tasks.add_task(send_email_async, email, content)
return {"message": "通知发送任务已提交"}
@app.post("/send-notification-sync")
async def send_notification_sync(
email: str,
content: str,
background_tasks: BackgroundTasks
):
# 方式2:同步后台任务(在线程池中运行)
background_tasks.add_task(send_email_sync, email, content)
return {"message": "同步通知任务已提交"}
# 更复杂的后台任务管理
class TaskManager:
def __init__(self):
self.tasks = {}
self.task_id = 0
async def run_long_task(self, task_id: int, duration: int):
task_status[task_id] = "running"
try:
for i in range(duration):
await asyncio.sleep(1)
task_status[task_id] = f"progress: {i+1}/{duration}"
task_status[task_id] = "completed"
except Exception as e:
task_status[task_id] = f"failed: {str(e)}"
task_manager = TaskManager()
@app.post("/start-task")
async def start_task(duration: int = 5):
task_id = len(task_status) + 1
task_status[task_id] = "pending"
# 在后台运行任务
asyncio.create_task(task_manager.run_long_task(task_id, duration))
return {"task_id": task_id, "status": "pending"}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: int):
status = task_status.get(task_id, "not_found")
return {"task_id": task_id, "status": status}
7. 性能对比:同步 vs 异步
| 指标 | 同步Flask | 异步FastAPI |
|---|---|---|
| 并发连接数(单进程) | 约200 | 5000+ |
| 100个IO请求耗时 | 约10秒(串行) | 约1秒(并发) |
| CPU密集型任务 | 阻塞事件循环 | 需使用线程池 |
| 内存占用/连接 | 约1MB | 约2KB |
| 代码复杂度 | 简单 | 中等(需要理解异步) |
8. 最佳实践总结
- 使用 async/await:所有IO操作都使用异步库(httpx、asyncpg、aiomysql)
- 避免阻塞事件循环:CPU密集型任务使用
asyncio.to_thread()或线程池 - 连接池管理:数据库连接池、HTTP连接池复用
- 错误处理:使用try/except捕获异步异常,避免任务静默失败
- 监控与日志:使用structlog、opentelemetry追踪异步任务
# 避免阻塞事件循环的示例
import asyncio
import hashlib
async def compute_hash(data: bytes) -> str:
# CPU密集型任务:使用to_thread避免阻塞
result = await asyncio.to_thread(hashlib.sha256, data)
return result.hexdigest()
async def main():
# 并发执行多个CPU密集型任务
tasks = [
compute_hash(b"data1" * 1000000),
compute_hash(b"data2" * 1000000),
compute_hash(b"data3" * 1000000)
]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
9. 总结
通过本文的案例,你掌握了Python异步编程和FastAPI的核心技术:
- async/await语法与事件循环
- FastAPI基础API构建
- 异步数据库操作(SQLAlchemy + databases)
- WebSocket实时通信
- 后台任务调度与任务管理
- 性能对比与最佳实践
Python异步编程让Web服务拥有了媲美Node.js和Go的并发能力。结合FastAPI的自动文档和类型检查,你可以构建出高性能、可维护的现代Web应用。现在就开始你的异步Python之旅吧!
本文原创,基于Python 3.12+和FastAPI 0.110+。所有代码均在Python 3.12环境中测试通过。

