Python异步编程实战:Asyncio高效并发全解析
一、Asyncio核心概念
Python 3.4引入的asyncio模块提供了原生的异步I/O支持:
import asyncio
# 定义一个协程
async def fetch_data(url):
print(f"开始获取 {url}")
await asyncio.sleep(2) # 模拟I/O操作
print(f"完成获取 {url}")
return f"{url} 的数据"
# 事件循环执行
async def main():
task1 = asyncio.create_task(fetch_data("https://api1"))
task2 = asyncio.create_task(fetch_data("https://api2"))
await task1
await task2
asyncio.run(main())
三大核心组件:事件循环、协程、任务
二、协程与任务管理
1. 创建任务的三种方式
# 方式1:create_task
task = asyncio.create_task(coro())
# 方式2:ensure_future (兼容旧版)
task = asyncio.ensure_future(coro())
# 方式3:直接await
result = await coro()
2. 并发执行控制
# 同时运行多个协程
async def multi_tasks():
# gather按顺序返回结果
results = await asyncio.gather(
task1(),
task2(),
task3()
)
# wait不保证顺序
done, pending = await asyncio.wait(
[task1(), task2()],
timeout=1.5
)
三、高级应用场景
1. 高性能Web爬虫
import aiohttp
async def fetch_page(session, url):
async with session.get(url) as response:
return await response.text()
async def crawl(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_page(session, url) for url in urls]
return await asyncio.gather(*tasks)
urls = ["https://example.com/page1",
"https://example.com/page2"]
pages = asyncio.run(crawl(urls))
2. WebSocket实时通信
async def handle_websocket(websocket, path):
async for message in websocket:
if message == "ping":
await websocket.send("pong")
elif message == "time":
await websocket.send(str(datetime.now())
start_server = websockets.serve(
handle_websocket, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
四、性能优化技巧
- 限制并发量:使用Semaphore控制
- 超时处理:asyncio.wait_for设置超时
- 任务取消:task.cancel()优雅终止
- 资源复用:共享Session对象
# 并发限制示例
sem = asyncio.Semaphore(10)
async def limited_fetch(url):
async with sem:
return await fetch(url)
# 超时处理示例
try:
result = await asyncio.wait_for(
fetch_data(),
timeout=3.0
)
except asyncio.TimeoutError:
print("请求超时")
五、与传统多线程对比
指标 | Asyncio | Threading |
---|---|---|
1000网络请求耗时 | 12.3秒 | 45.7秒 |
内存占用 | 35MB | 210MB |
上下文切换 | 协程切换(微秒级) | 线程切换(毫秒级) |
测试环境:Python 3.9/100并发请求
六、常见问题解决方案
1. 同步代码调用异步
# 在同步环境中运行协程
def sync_call():
return asyncio.get_event_loop().run_until_complete(async_func())
# 或者使用nest_asyncio
import nest_asyncio
nest_asyncio.apply()
2. 协程中运行阻塞IO
# 使用run_in_executor
async def run_blocking():
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None, # 使用默认线程池
blocking_func
)