Python异步编程新范式:协程与任务调度深度实战
一、协程核心机制
突破传统同步编程的性能瓶颈:
import asyncio
from datetime import datetime
async def task(name, delay):
print(f"[{datetime.now()}] Task {name} started")
await asyncio.sleep(delay)
print(f"[{datetime.now()}] Task {name} finished")
return f"Result-{name}"
async def main():
# 创建任务列表
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3))
]
# 等待首个完成的任务
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# 处理已完成任务
for completed_task in done:
print(f"First completed: {completed_task.result()}")
# 取消剩余任务
for task in pending:
task.cancel()
# 运行事件循环
asyncio.run(main())
核心优势:非阻塞IO、高并发、低资源消耗、代码简洁
二、高级协程模式
1. 协程通信与同步
async def producer(queue, item):
await queue.put(item)
print(f"Produced: {item}")
async def consumer(queue):
while True:
item = await queue.get()
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=3)
# 启动生产者和消费者
producers = [asyncio.create_task(producer(queue, i))
for i in range(5)]
consumers = [asyncio.create_task(consumer(queue))
for _ in range(2)]
await asyncio.gather(*producers)
await queue.join() # 等待队列处理完成
for c in consumers:
c.cancel()
# 信号量控制并发
async def limited_task(sem, url):
async with sem:
return await fetch(url)
sem = asyncio.Semaphore(10)
tasks = [limited_task(sem, url) for url in urls]
await asyncio.gather(*tasks)
2. 协程超时与重试
async def fetch_with_retry(url, retries=3, timeout=5):
for attempt in range(retries):
try:
async with asyncio.timeout(timeout):
return await fetch(url)
except Exception as e:
print(f"Attempt {attempt+1} failed: {str(e)}")
if attempt == retries - 1:
raise
await asyncio.sleep(1 * (attempt + 1))
return None
# 任务分组执行
async def batch_process(items, batch_size=10):
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
tasks = [process_item(item) for item in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"Error processing: {result}")
else:
yield result
三、性能优化实战
1. 异步数据库访问
import asyncpg
from asyncpg.pool import Pool
async def init_pool() -> Pool:
return await asyncpg.create_pool(
min_size=5,
max_size=20,
command_timeout=60,
host='localhost',
user='postgres'
)
async def fetch_users(pool: Pool):
async with pool.acquire() as conn:
return await conn.fetch(
"SELECT * FROM users WHERE active = $1", True
)
# 事务处理示例
async def transfer_funds(pool, from_id, to_id, amount):
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_id
)
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_id
)
2. 高性能HTTP客户端
import aiohttp
from aiohttp import ClientSession
async def fetch_all(urls):
async with ClientSession() as session:
tasks = []
for url in urls:
task = asyncio.create_task(
fetch_url(session, url)
)
tasks.append(task)
return await asyncio.gather(*tasks)
async def fetch_url(session: ClientSession, url):
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
return await response.json()
return None
except Exception as e:
print(f"Error fetching {url}: {str(e)}")
return None
# 限速请求
async def rate_limited_fetch(sem, session, url):
async with sem:
await asyncio.sleep(0.5) # 延迟500ms
return await fetch_url(session, url)
四、电商系统实战案例
1. 异步订单处理系统
class OrderProcessor:
def __init__(self):
self.order_queue = asyncio.Queue()
self.is_running = False
async def process_order(self, order):
# 模拟订单处理
await asyncio.sleep(0.5)
print(f"Processed order {order['id']}")
return {"status": "completed"}
async def worker(self):
while self.is_running:
order = await self.order_queue.get()
try:
result = await self.process_order(order)
await self.save_result(order['id'], result)
except Exception as e:
print(f"Failed to process order {order['id']}: {str(e)}")
finally:
self.order_queue.task_done()
async def start(self, worker_count=5):
self.is_running = True
self.workers = [asyncio.create_task(self.worker())
for _ in range(worker_count)]
async def stop(self):
self.is_running = False
await self.order_queue.join()
for worker in self.workers:
worker.cancel()
# 使用示例
async def main():
processor = OrderProcessor()
await processor.start()
# 模拟订单添加
for i in range(100):
await processor.order_queue.put({"id": i+1})
await asyncio.sleep(5)
await processor.stop()
五、生产环境最佳实践
- 错误处理:为每个协程添加异常捕获
- 资源限制:使用信号量控制最大并发
- 性能监控:记录关键协程执行时间
- 调试技巧:使用asyncio调试模式
- 测试策略:使用pytest-asyncio进行测试