从基础概念到企业级应用,全面掌握Python异步编程与FastAPI开发
一、异步编程核心概念深度解析
1.1 同步 vs 异步的本质区别
同步编程采用阻塞式调用,程序必须等待当前操作完成才能继续执行。异步编程则通过事件循环和非阻塞IO,在等待IO操作时释放CPU资源处理其他任务。
import asyncio
import time
# 同步版本
def sync_task(name, delay):
print(f"{name} 开始执行")
time.sleep(delay) # 阻塞调用
print(f"{name} 执行完成")
return f"{name} 结果"
# 异步版本
async def async_task(name, delay):
print(f"{name} 开始执行")
await asyncio.sleep(delay) # 非阻塞调用
print(f"{name} 执行完成")
return f"{name} 结果"
# 同步执行(总耗时3秒)
start = time.time()
sync_task("任务1", 1)
sync_task("任务2", 2)
print(f"同步执行耗时: {time.time() - start:.2f}秒")
# 异步执行(总耗时2秒)
async def main():
start = time.time()
task1 = async_task("异步任务1", 1)
task2 = async_task("异步任务2", 2)
results = await asyncio.gather(task1, task2)
print(f"异步执行耗时: {time.time() - start:.2f}秒")
print(f"执行结果: {results}")
# asyncio.run(main())
1.2 异步编程核心组件
- 事件循环(Event Loop):异步编程的心脏,负责调度和执行协程
- 协程(Coroutine):使用async def定义的异步函数
- 任务(Task):对协程的封装,用于并发执行
- Future:表示异步操作的最终结果
import asyncio
async def coroutine_demo():
"""基础协程示例"""
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
return "完成"
async def task_management():
"""任务管理示例"""
# 创建任务
task1 = asyncio.create_task(coroutine_demo())
task2 = asyncio.create_task(coroutine_demo())
# 等待任务完成
results = await asyncio.gather(task1, task2)
print(f"所有任务完成: {results}")
async def future_demo():
"""Future对象示例"""
loop = asyncio.get_running_loop()
future = loop.create_future()
async def set_result():
await asyncio.sleep(1)
future.set_result("Future结果")
# 启动设置结果的协程
asyncio.create_task(set_result())
# 等待Future完成
result = await future
print(f"Future结果: {result}")
# 运行示例
# asyncio.run(task_management())
# asyncio.run(future_demo())
二、FastAPI基础实战:构建高性能API
2.1 项目结构设计与环境配置
# 项目结构
fastapi-project/
├── app/
│ ├── __init__.py
│ ├── main.py # 应用入口
│ ├── core/ # 核心配置
│ │ ├── __init__.py
│ │ ├── config.py # 配置文件
│ │ └── security.py # 安全配置
│ ├── api/ # API路由
│ │ ├── __init__.py
│ │ ├── v1/ # API版本1
│ │ │ ├── __init__.py
│ │ │ ├── endpoints/ # 端点模块
│ │ │ └── deps.py # 依赖项
│ ├── models/ # 数据模型
│ ├── schemas/ # Pydantic模式
│ └── services/ # 业务逻辑
2.2 基础API开发实战
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr
from typing import List, Optional
import uuid
from datetime import datetime
# 创建FastAPI应用
app = FastAPI(
title="用户管理系统API",
description="基于FastAPI的高性能用户管理接口",
version="1.0.0"
)
# 数据模型
class UserBase(BaseModel):
username: str
email: EmailStr
full_name: Optional[str] = None
class UserCreate(UserBase):
password: str
class UserResponse(UserBase):
id: str
created_at: datetime
class Config:
from_attributes = True
# 模拟数据库
fake_users_db = {}
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
"""创建用户接口"""
user_id = str(uuid.uuid4())
# 检查用户名是否已存在
for existing_user in fake_users_db.values():
if existing_user["username"] == user.username:
raise HTTPException(status_code=400, detail="用户名已存在")
# 创建用户记录
user_data = {
"id": user_id,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"created_at": datetime.now()
}
fake_users_db[user_id] = user_data
return user_data
@app.get("/users/", response_model=List[UserResponse])
async def list_users(skip: int = 0, limit: int = 10):
"""获取用户列表"""
users = list(fake_users_db.values())
return users[skip:skip + limit]
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: str):
"""获取单个用户详情"""
if user_id not in fake_users_db:
raise HTTPException(status_code=404, detail="用户不存在")
return fake_users_db[user_id]
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: str, user_update: UserBase):
"""更新用户信息"""
if user_id not in fake_users_db:
raise HTTPException(status_code=404, detail="用户不存在")
# 更新用户数据
fake_users_db[user_id].update(user_update.dict(exclude_unset=True))
return fake_users_db[user_id]
# 运行应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
三、FastAPI高级特性深度应用
3.1 依赖注入系统
from fastapi import Depends, Header, HTTPException
from typing import Optional
async def get_token_header(x_token: str = Header(...)):
"""Token验证依赖"""
if x_token != "fake-super-secret-token":
raise HTTPException(status_code=400, detail="无效的Token")
return x_token
async def get_query_token(token: Optional[str] = None):
"""查询参数Token验证"""
if not token:
raise HTTPException(status_code=400, detail="Token参数缺失")
return token
class UserService:
"""用户服务类"""
def __init__(self):
self.users = {}
async def get_current_user(self, token: str = Depends(get_token_header)):
"""获取当前用户依赖"""
# 模拟根据token获取用户
user = self.users.get(token)
if not user:
raise HTTPException(status_code=404, detail="用户未找到")
return user
async def is_admin(self, user: dict = Depends(get_current_user)):
"""管理员权限验证"""
if not user.get("is_admin", False):
raise HTTPException(status_code=403, detail="权限不足")
return user
# 使用依赖注入
@app.get("/admin/", dependencies=[Depends(get_token_header)])
async def admin_dashboard():
return {"message": "管理员面板"}
@app.get("/users/me/")
async def read_users_me(current_user: dict = Depends(UserService().get_current_user)):
return current_user
@app.get("/admin/users/")
async def admin_users(admin_user: dict = Depends(UserService().is_admin)):
return {"message": "管理员用户列表", "user": admin_user}
3.2 后台任务与WebSocket实时通信
from fastapi import BackgroundTasks, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
import json
# 后台任务示例
def write_log(message: str):
"""后台写入日志"""
with open("app.log", "a") as f:
f.write(f"{datetime.now()}: {message}n")
@app.post("/send-notification/")
async def send_notification(
message: str,
background_tasks: BackgroundTasks
):
"""发送通知(包含后台任务)"""
background_tasks.add_task(write_log, f"通知发送: {message}")
return {"message": "通知已发送", "content": message}
# WebSocket连接管理器
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = {"client_id": client_id, "content": data}
await manager.broadcast(json.dumps(message))
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"客户端 {client_id} 已断开连接")
3.3 数据库异步操作与ORM集成
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, DateTime, Text
import os
# 数据库配置
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./test.db")
# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
# 定义数据模型
class User(Base):
__tablename__ = "users"
id = Column(String(36), primary_key=True, index=True)
username = Column(String(50), unique=True, index=True)
email = Column(String(100), unique=True, index=True)
full_name = Column(String(100))
created_at = Column(DateTime, default=datetime.now)
# 数据库依赖
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
@app.post("/async-users/", response_model=UserResponse)
async def create_async_user(
user: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""异步创建用户"""
# 检查用户是否存在
existing_user = await db.execute(
select(User).where(User.username == user.username)
)
if existing_user.scalar_one_or_none():
raise HTTPException(status_code=400, detail="用户名已存在")
# 创建新用户
db_user = User(
id=str(uuid.uuid4()),
username=user.username,
email=user.email,
full_name=user.full_name
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
# 创建数据库表
@app.on_event("startup")
async def startup_event():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
四、完整项目实战:电商订单系统
4.1 系统架构设计
# app/main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
from app.api.v1 import api_router
from app.core.config import settings
from app.core.database import init_db
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化数据库
await init_db()
yield
# 关闭时清理资源
pass
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.VERSION,
lifespan=lifespan
)
# 注册路由
app.include_router(api_router, prefix="/api/v1")
# app/core/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
PROJECT_NAME: str = "电商订单系统"
VERSION: str = "1.0.0"
DATABASE_URL: str = "sqlite+aiosqlite:///./ecommerce.db"
SECRET_KEY: str = "your-secret-key"
class Config:
env_file = ".env"
settings = Settings()
# app/api/v1/endpoints/orders.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from typing import List
from app.schemas.order import OrderCreate, OrderResponse
from app.services.order_service import OrderService
from app.api.v1.deps import get_order_service
router = APIRouter()
@router.post("/orders/", response_model=OrderResponse)
async def create_order(
order: OrderCreate,
background_tasks: BackgroundTasks,
order_service: OrderService = Depends(get_order_service)
):
"""创建订单"""
return await order_service.create_order(
order, background_tasks=background_tasks
)
@router.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
order_id: str,
order_service: OrderService = Depends(get_order_service)
):
"""获取订单详情"""
order = await order_service.get_order(order_id)
if not order:
raise HTTPException(status_code=404, detail="订单不存在")
return order
# app/services/order_service.py
class OrderService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_order(
self,
order_data: OrderCreate,
background_tasks: BackgroundTasks = None
):
"""创建订单业务逻辑"""
# 验证库存
if not await self._check_inventory(order_data.items):
raise HTTPException(status_code=400, detail="库存不足")
# 创建订单
order = await self._create_order_record(order_data)
# 后台任务:发送通知、更新库存等
if background_tasks:
background_tasks.add_task(self._process_order_async, order.id)
return order
async def _check_inventory(self, items: List[dict]) -> bool:
"""检查库存"""
# 实现库存检查逻辑
return True
async def _create_order_record(self, order_data: OrderCreate):
"""创建订单记录"""
# 实现订单创建逻辑
pass
async def _process_order_async(self, order_id: str):
"""异步处理订单"""
await asyncio.sleep(1) # 模拟处理时间
print(f"订单 {order_id} 处理完成")
4.2 性能测试与优化
import asyncio
import aiohttp
import time
from concurrent.futures import ThreadPoolExecutor
async def test_concurrent_requests():
"""并发请求测试"""
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(100): # 100个并发请求
task = session.get('http://localhost:8000/users/')
tasks.append(task)
start_time = time.time()
responses = await asyncio.gather(*tasks)
end_time = time.time()
print(f"并发请求数量: {len(responses)}")
print(f"总耗时: {end_time - start_time:.2f}秒")
print(f"平均响应时间: {(end_time - start_time) * 1000 / len(responses):.2f}毫秒")
# 运行性能测试
# asyncio.run(test_concurrent_requests())
# 使用ab进行压力测试
# ab -n 1000 -c 100 http://localhost:8000/users/
总结与最佳实践
通过本教程,我们深入探讨了Python异步编程的核心概念和FastAPI的高级应用。关键要点包括:
- 异步编程优势:显著提升I/O密集型应用的性能
- FastAPI特性:自动文档生成、类型验证、依赖注入
- 架构设计:清晰的项目结构、服务分层、依赖管理
- 性能优化:异步数据库操作、后台任务、连接池
在实际项目中,建议:
- 合理使用异步,避免在CPU密集型任务中滥用
- 采用分层架构,保持代码的可维护性
- 充分利用FastAPI的自动验证和文档功能
- 实施完整的错误处理和日志记录