基于ASGI构建分布式微服务架构的完整指南
一、现代Python Web架构演进
Python Web开发已从传统同步模式演进为异步优先架构:
架构类型 | 代表框架 | QPS能力 | 适用场景 |
---|---|---|---|
同步阻塞 | Flask/Django | 1k-3k | 传统Web应用 |
异步非阻塞 | FastAPI/Starlette | 10k-50k | 高并发API服务 |
分布式微服务 | 自定义ASGI框架 | 50k+ | 企业级云原生应用 |
本文将聚焦ASGI标准和分布式微服务架构的实现。
二、核心框架设计与实现
1. 基础ASGI应用骨架
from typing import Awaitable, Callable, Dict
class ASGIApp:
def __init__(self):
self.router = Router()
self.middleware = []
async def __call__(self, scope, receive, send):
# 中间件处理链
handler = self.request_handler
for middleware in reversed(self.middleware):
handler = middleware(handler)
await handler(scope, receive, send)
async def request_handler(self, scope, receive, send):
path = scope["path"]
method = scope["method"]
# 路由匹配
handler, params = self.router.match(path, method)
if not handler:
await self.send_404(send)
return
# 构造请求对象
request = Request(scope, receive, params)
# 执行处理函数
response = await handler(request)
# 发送响应
await response(scope, receive, send)
def add_route(self, path: str, methods: list, handler: Callable):
self.router.add(path, methods, handler)
def add_middleware(self, middleware: Callable):
self.middleware.append(middleware)
2. 高性能路由系统
import re
from typing import Tuple, Optional, Dict, Callable
class Router:
def __init__(self):
self.routes = []
def add(self, path: str, methods: list, handler: Callable):
# 转换路径参数为正则表达式
pattern = re.sub(r'{(w+)}', r'(?P[^/]+)', path)
regex = re.compile(f'^{pattern}$')
self.routes.append({
'regex': regex,
'methods': methods,
'handler': handler
})
def match(self, path: str, method: str) -> Tuple[Optional[Callable], Dict]:
for route in self.routes:
match = route['regex'].match(path)
if match and method in route['methods']:
return route['handler'], match.groupdict()
return None, {}
三、分布式微服务架构
1. 服务注册与发现
import aiohttp
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class ServiceNode:
name: str
url: str
health: str = "healthy"
last_seen: float = 0.0
class ServiceRegistry:
def __init__(self, consul_url: str):
self.consul_url = consul_url
self.services: Dict[str, List[ServiceNode]] = {}
self.session = aiohttp.ClientSession()
async def register_service(self, name: str, url: str):
payload = {
"Name": name,
"Address": url,
"Check": {
"HTTP": f"{url}/health",
"Interval": "10s"
}
}
async with self.session.put(
f"{self.consul_url}/v1/agent/service/register",
json=payload
) as resp:
if resp.status != 200:
raise Exception("Service registration failed")
async def discover_services(self, name: str) -> List[ServiceNode]:
async with self.session.get(
f"{self.consul_url}/v1/health/service/{name}"
) as resp:
data = await resp.json()
return [
ServiceNode(
name=service["Service"]["Service"],
url=f"http://{service['Service']['Address']}:{service['Service']['Port']}",
health=check["Status"] if (check := service.get("Checks")) else "unknown"
)
for service in data
]
2. 负载均衡实现
from enum import Enum
import random
import time
class LoadBalanceStrategy(Enum):
ROUND_ROBIN = 1
RANDOM = 2
LEAST_CONN = 3
class LoadBalancer:
def __init__(self, registry: ServiceRegistry):
self.registry = registry
self.counters = {}
self.last_refresh = 0
self.cache_ttl = 30 # 秒
async def get_node(self, service_name: str, strategy: LoadBalanceStrategy) -> ServiceNode:
# 刷新服务缓存
if time.time() - self.last_refresh > self.cache_ttl:
await self.refresh_services()
nodes = self.registry.services.get(service_name, [])
if not nodes:
raise Exception(f"No available nodes for service {service_name}")
healthy_nodes = [n for n in nodes if n.health == "healthy"]
if not healthy_nodes:
raise Exception(f"No healthy nodes for service {service_name}")
if strategy == LoadBalanceStrategy.ROUND_ROBIN:
return self._round_robin(service_name, healthy_nodes)
elif strategy == LoadBalanceStrategy.RANDOM:
return random.choice(healthy_nodes)
elif strategy == LoadBalanceStrategy.LEAST_CONN:
return self._least_connections(healthy_nodes)
def _round_robin(self, service_name: str, nodes: List[ServiceNode]) -> ServiceNode:
idx = self.counters.get(service_name, 0) % len(nodes)
self.counters[service_name] = idx + 1
return nodes[idx]
def _least_connections(self, nodes: List[ServiceNode]) -> ServiceNode:
# 实现需要服务节点暴露连接数指标
return min(nodes, key=lambda n: n.metrics.get("active_conns", 0))
四、高级特性实现
1. 分布式追踪集成
import contextvars
from typing import Optional
import uuid
trace_id = contextvars.ContextVar('trace_id', default=None)
class TracingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# 从请求头获取或生成追踪ID
headers = dict(scope.get("headers", []))
req_trace_id = headers.get(b"x-trace-id", str(uuid.uuid4()).encode()
# 设置上下文变量
trace_id.set(req_trace_id.decode())
# 添加追踪头到下游请求
async def send_with_tracing(message):
if message["type"] == "http.response.start":
headers = [
(b"x-trace-id", req_trace_id),
*message.get("headers", [])
]
message["headers"] = headers
await send(message)
await self.app(scope, receive, send_with_tracing)
2. 自适应限流算法
import time
from collections import deque
class AdaptiveRateLimiter:
def __init__(self, max_rate: int, window_size: int = 10):
self.max_rate = max_rate
self.window_size = window_size
self.request_times = deque()
self.current_rate = 0
self.cooldown = False
async def acquire(self):
now = time.time()
# 移除过期的请求记录
while self.request_times and now - self.request_times[0] > 1.0:
self.request_times.popleft()
# 冷却状态处理
if self.cooldown:
if len(self.request_times) = self.max_rate:
self.cooldown = True
raise RateLimitExceeded("Rate limit exceeded")
self.request_times.append(now)
return True
五、性能优化策略
1. 连接池管理
from typing import Dict, List
import asyncio
class ConnectionPool:
def __init__(self, max_size=10):
self.max_size = max_size
self.pools: Dict[str, List] = {}
self.locks = {}
async def get_connection(self, url: str):
if url not in self.locks:
self.locks[url] = asyncio.Lock()
self.pools[url] = []
async with self.locks[url]:
if self.pools[url]:
return self.pools[url].pop()
if len(self.pools[url]) < self.max_size:
# 创建新连接
conn = await self._create_connection(url)
return conn
# 等待连接释放
while not self.pools[url]:
await asyncio.sleep(0.1)
return self.pools[url].pop()
async def release_connection(self, url: str, conn):
async with self.locks[url]:
self.pools[url].append(conn)
async def _create_connection(self, url: str):
# 实现具体连接创建逻辑
return {"url": url, "created_at": time.time()}
2. 智能缓存策略
from datetime import datetime, timedelta
import pickle
import hashlib
class SmartCache:
def __init__(self, redis_conn, default_ttl=300):
self.redis = redis_conn
self.default_ttl = default_ttl
def key_for(self, func, *args, **kwargs):
# 生成唯一缓存键
args_str = pickle.dumps((args, kwargs))
func_name = func.__module__ + "." + func.__qualname__
hash_str = hashlib.md5(args_str).hexdigest()
return f"cache:{func_name}:{hash_str}"
async def cached(self, func, *args, ttl=None, **kwargs):
cache_key = self.key_for(func, *args, **kwargs)
# 尝试从缓存获取
cached = await self.redis.get(cache_key)
if cached is not None:
return pickle.loads(cached)
# 执行函数并缓存结果
result = await func(*args, **kwargs)
await self.redis.setex(
cache_key,
ttl or self.default_ttl,
pickle.dumps(result)
)
return result
async def invalidate(self, func, *args, **kwargs):
cache_key = self.key_for(func, *args, **kwargs)
await self.redis.delete(cache_key)
六、实战案例:订单微服务
1. 服务定义与实现
from dataclasses import dataclass
from typing import Optional
@dataclass
class Order:
id: str
user_id: int
items: List[Dict]
status: str
created_at: float
class OrderService:
def __init__(self, db_pool):
self.db = db_pool
async def create_order(self, user_id: int, items: List[Dict]) -> Order:
order_id = str(uuid.uuid4())
order = Order(
id=order_id,
user_id=user_id,
items=items,
status="created",
created_at=time.time()
)
async with self.db.acquire() as conn:
await conn.execute(
"INSERT INTO orders VALUES ($1, $2, $3, $4, $5)",
order.id, order.user_id, order.items,
order.status, order.created_at
)
return order
async def get_order(self, order_id: str) -> Optional[Order]:
async with self.db.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM orders WHERE id = $1",
order_id
)
if row:
return Order(**row)
return None
2. ASGI接口暴露
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/orders")
async def create_order(order_data: dict):
order_service = get_order_service()
order = await order_service.create_order(
order_data["user_id"],
order_data["items"]
)
return {"order_id": order.id}
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
order_service = get_order_service()
order = await order_service.get_order(order_id)
if not order:
raise HTTPException(status_code=404)
return order