Python异步编程革命:AsyncIO与多模式协程实战指南
一、AsyncIO核心机制解析
突破GIL限制的事件循环模型:
import asyncio
from datetime import datetime
async def task(name, delay):
print(f"[{datetime.now()}] Task {name} started")
await asyncio.sleep(delay)
print(f"[{datetime.now()}] Task {name} finished")
return f"Result-{name}"
async def main():
# 创建多个协程任务
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3))
]
# 等待首个任务完成
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# 获取已完成任务结果
for task in done:
print(f"First completed: {task.result()}")
# 取消剩余任务
for task in pending:
task.cancel()
# 事件循环执行
asyncio.run(main())
核心组件:事件循环、协程、Future对象、任务队列
二、高级协程模式实战
1. 协程通信与同步
async def producer(queue, item):
await queue.put(item)
print(f"Produced: {item}")
async def consumer(queue):
while True:
item = await queue.get()
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=3)
# 启动生产者和消费者
producers = [asyncio.create_task(producer(queue, i)) for i in range(5)]
consumers = [asyncio.create_task(consumer(queue)) for _ in range(2)]
# 等待生产者完成
await asyncio.gather(*producers)
await queue.join() # 等待队列处理完成
# 取消消费者
for c in consumers:
c.cancel()
# 信号量控制并发
async def limited_task(sem, url):
async with sem:
return await fetch(url)
sem = asyncio.Semaphore(10)
tasks = [limited_task(sem, url) for url in urls]
await asyncio.gather(*tasks)
2. 协程与线程池协作
from concurrent.futures import ThreadPoolExecutor
import time
def blocking_io():
time.sleep(1) # 模拟阻塞IO
return "IO result"
async def hybrid_worker():
loop = asyncio.get_running_loop()
# 在默认线程池执行阻塞调用
result = await loop.run_in_executor(
None, # 使用默认executor
blocking_io
)
print(result)
# 使用自定义线程池
with ThreadPoolExecutor(max_workers=3) as pool:
result = await loop.run_in_executor(
pool,
blocking_io
)
print(f"Custom pool: {result}")
# 协程超时控制
async def fetch_with_timeout():
try:
async with asyncio.timeout(3.0):
await fetch_data()
except TimeoutError:
print("Request timed out")
三、高性能服务架构
1. 异步HTTP服务器
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
data = await request.json()
return web.json_response({
"message": f"Hello {name}",
"your_data": data
})
app = web.Application()
app.add_routes([
web.get('/', handle),
web.post('/{name}', handle)
])
async def background_task(app):
while True:
await asyncio.sleep(10)
print("Background task running")
app.on_startup.append(lambda _: asyncio.create_task(background_task(app)))
web.run_app(app, port=8080)
2. 数据库连接池优化
import asyncpg
from asyncpg.pool import Pool
async def init_pool() -> Pool:
return await asyncpg.create_pool(
min_size=5,
max_size=20,
command_timeout=60,
host='localhost',
user='postgres'
)
async def fetch_user(pool: Pool, user_id: int):
async with pool.acquire() as conn:
return await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
# 使用示例
pool = await init_pool()
user = await fetch_user(pool, 1)
# 事务处理示例
async def transfer_funds(pool, from_id, to_id, amount):
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_id
)
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_id
)
四、生产环境最佳实践
- 错误处理:为每个协程添加独立异常处理
- 性能监控:使用uvloop替代默认事件循环
- 资源限制:通过Semaphore控制最大并发
- 调试技巧:启用asyncio调试模式检测协程泄漏
- 测试方案:使用pytest-asyncio进行异步测试