深入掌握Python asyncio库,构建高性能异步应用程序
异步编程简介
异步编程是现代Python开发中至关重要的技能,特别是在I/O密集型应用场景中。通过asyncio库,Python提供了原生的异步编程支持,允许开发者编写高性能的非阻塞代码。
与多线程和多进程不同,异步编程使用单线程事件循环处理多个任务,通过协程的挂起和恢复实现并发,避免了线程切换的开销和复杂性。
协程基础
协程是异步编程的核心构建块,是可以暂停和恢复执行的函数。
定义协程
import asyncio
# 使用async def定义协程
async def simple_coroutine():
print("Hello")
await asyncio.sleep(1) # 模拟I/O操作
print("World")
# 运行协程
asyncio.run(simple_coroutine())
await表达式
async def fetch_data():
print("开始获取数据")
await asyncio.sleep(2) # 模拟网络请求
print("数据获取完成")
return {"data": 123}
async def main():
result = await fetch_data() # 等待协程完成
print(f"获取到的数据: {result}")
asyncio.run(main())
并发运行多个协程
async def task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"{name}-结果"
async def main():
# 并发运行多个任务
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
print(f"所有任务完成: {results}")
asyncio.run(main())
事件循环详解
事件循环是asyncio的核心,负责调度和执行协程任务。
获取和管理事件循环
async def example_loop():
loop = asyncio.get_running_loop() # 获取当前事件循环
print(f"事件循环: {loop}")
# 在事件循环中调度任务
result = await loop.run_in_executor(None, lambda: "CPU密集型任务")
print(f"执行器结果: {result}")
asyncio.run(example_loop())
自定义事件循环
async def custom_loop_example():
# 创建自定义策略(高级用法)
try:
# 在Unix系统上使用UVLoop可以提高性能
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
print("UVLoop不可用,使用默认事件循环")
# 正常使用asyncio
await asyncio.sleep(1)
print("使用自定义事件循环策略")
asyncio.run(custom_loop_example())
任务与Future对象
Task是Future的子类,用于在事件循环中调度协程的执行。
创建和管理任务
async def long_running_task(seconds):
print(f"任务开始,需要 {seconds} 秒")
await asyncio.sleep(seconds)
print("任务完成")
return seconds
async def main():
# 创建任务但不立即等待
task1 = asyncio.create_task(long_running_task(2))
task2 = asyncio.create_task(long_running_task(3))
# 执行其他操作
print("任务已创建,执行其他工作...")
await asyncio.sleep(1)
print("其他工作完成")
# 等待任务完成
results = await asyncio.gather(task1, task2)
print(f"任务结果: {results}")
asyncio.run(main())
使用Future对象
async def set_future_result(future, delay, value):
await asyncio.sleep(delay)
future.set_result(value)
async def future_example():
loop = asyncio.get_event_loop()
future = loop.create_future()
# 安排一个任务来设置future的结果
asyncio.create_task(set_future_result(future, 2, "未来结果"))
print("等待future完成...")
result = await future # 等待future被设置结果
print(f"Future结果: {result}")
asyncio.run(future_example())
超时处理
async def might_timeout(delay):
await asyncio.sleep(delay)
return "成功完成"
async def main():
try:
# 设置3秒超时
result = await asyncio.wait_for(might_timeout(5), timeout=3)
print(f"结果: {result}")
except asyncio.TimeoutError:
print("任务超时")
asyncio.run(main())
异步I/O操作
asyncio提供了多种异步I/O操作,适用于网络通信和文件操作。
异步网络请求
import aiohttp
import asyncio
async def fetch_url(session, url):
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
return f"错误: {e}"
async def main():
urls = [
"https://httpbin.org/json",
"https://httpbin.org/xml",
"https://httpbin.org/html"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"{url}: 获取到 {len(content)} 字符")
asyncio.run(main())
异步文件操作
import aiofiles
import asyncio
async def async_file_operations():
# 异步写入文件
async with aiofiles.open('test.txt', 'w') as f:
await f.write('Hello, Async World!')
# 异步读取文件
async with aiofiles.open('test.txt', 'r') as f:
content = await f.read()
print(f"文件内容: {content}")
asyncio.run(async_file_operations())
异步数据库操作
import asyncpg
import asyncio
async def async_database_example():
try:
# 连接数据库
conn = await asyncpg.connect(
user='user', password='password',
database='test', host='localhost'
)
# 执行查询
result = await conn.fetch('SELECT * FROM users WHERE active = $1', True)
print(f"找到 {len(result)} 个活跃用户")
# 关闭连接
await conn.close()
except Exception as e:
print(f"数据库错误: {e}")
# asyncio.run(async_database_example())
异步同步原语
asyncio提供了多种同步原语,用于协调异步任务。
异步锁
import asyncio
class AsyncCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock: # 获取锁
current = self.value
await asyncio.sleep(0.1) # 模拟I/O操作
self.value = current + 1
async def main():
counter = AsyncCounter()
# 创建多个任务同时增加计数器
tasks = [asyncio.create_task(counter.increment()) for _ in range(10)]
await asyncio.gather(*tasks)
print(f"最终计数器值: {counter.value}") # 应该是10
asyncio.run(main())
信号量
async def limited_resource(semaphore, name):
async with semaphore:
print(f"{name} 获取资源")
await asyncio.sleep(2) # 模拟资源使用
print(f"{name} 释放资源")
async def main():
semaphore = asyncio.Semaphore(3) # 允许3个并发访问
tasks = []
for i in range(10):
tasks.append(limited_resource(semaphore, f"任务-{i}"))
await asyncio.gather(*tasks)
asyncio.run(main())
事件和条件变量
async def waiter(event, name):
print(f"{name} 等待事件")
await event.wait()
print(f"{name} 检测到事件")
async def setter(event):
await asyncio.sleep(2)
print("设置事件")
event.set()
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event, "A"),
waiter(event, "B"),
setter(event)
)
asyncio.run(main())
实战案例:异步Web爬虫
下面通过一个完整的异步Web爬虫示例展示asyncio的实际应用:
import aiohttp
import asyncio
from urllib.parse import urljoin, urlparse
import time
class AsyncWebCrawler:
def __init__(self, max_concurrent=10):
self.visited = set()
self.to_crawl = asyncio.Queue()
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def fetch(self, url):
async with self.semaphore:
try:
async with self.session.get(url, timeout=5) as response:
if response.status == 200:
return await response.text()
return None
except Exception as e:
print(f"获取 {url} 时出错: {e}")
return None
def extract_links(self, html, base_url):
# 简化的链接提取
import re
links = re.findall(r'href="([^" rel="external nofollow" ]+)"', html)
full_links = set()
for link in links:
full_link = urljoin(base_url, link)
if urlparse(full_link).netloc == urlparse(base_url).netloc:
full_links.add(full_link)
return full_links
async def worker(self, name):
while True:
try:
url = await self.to_crawl.get()
if url in self.visited:
self.to_crawl.task_done()
continue
self.visited.add(url)
print(f"{name} 爬取: {url}")
html = await self.fetch(url)
if html:
links = self.extract_links(html, url)
for link in links:
if link not in self.visited:
await self.to_crawl.put(link)
self.to_crawl.task_done()
except asyncio.CancelledError:
break
async def crawl(self, start_url, max_pages=50):
self.session = aiohttp.ClientSession()
await self.to_crawl.put(start_url)
# 创建工作线程
workers = [
asyncio.create_task(self.worker(f"Worker-{i}"))
for i in range(self.max_concurrent)
]
# 等待队列为空或达到最大页面数
while len(self.visited) = max_pages:
break
# 取消工作线程
for worker in workers:
worker.cancel()
await self.to_crawl.join()
await self.session.close()
return list(self.visited)[:max_pages]
async def main():
crawler = AsyncWebCrawler(max_concurrent=5)
start_time = time.time()
try:
results = await crawler.crawl("https://httpbin.org", max_pages=20)
print(f"n爬取完成! 找到 {len(results)} 个页面")
print("示例页面:", results[:5])
except Exception as e:
print(f"爬虫错误: {e}")
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
最佳实践与性能优化
遵循这些最佳实践可以编写出更高效、更可靠的异步代码:
1. 避免阻塞操作
import asyncio
import time
# 错误做法 - 在协程中使用阻塞调用
async def bad_example():
time.sleep(2) # 阻塞调用,会阻塞事件循环
# 正确做法 - 使用异步版本或run_in_executor
async def good_example():
await asyncio.sleep(2) # 非阻塞
async def cpu_intensive_task():
loop = asyncio.get_event_loop()
# 将CPU密集型任务放到执行器中
await loop.run_in_executor(None, lambda: time.sleep(2))
2. 合理控制并发数量
async def bounded_gather(tasks, concurrency_limit):
semaphore = asyncio.Semaphore(concurrency_limit)
async def bounded_task(task):
async with semaphore:
return await task
return await asyncio.gather(*[bounded_task(task) for task in tasks])
# 使用示例
async def example_usage():
tasks = [asyncio.sleep(1) for _ in range(100)]
await bounded_gather(tasks, 10) # 限制10个并发
3. 正确处理错误和异常
async def robust_task():
try:
result = await might_fail()
return {"success": True, "data": result}
except asyncio.CancelledError:
print("任务被取消")
raise
except Exception as e:
print(f"任务失败: {e}")
return {"success": False, "error": str(e)}
async def main():
tasks = [robust_task() for _ in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"异常结果: {result}")
else:
print(f"正常结果: {result}")
4. 使用结构化并发
async def structured_concurrency_example():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(asyncio.sleep(1))
task2 = tg.create_task(asyncio.sleep(2))
print("所有任务完成")
except* Exception as eg:
print(f"任务组中出现异常: {eg.exceptions}")
# Python 3.11+ 支持TaskGroup
5. 监控和调试异步代码
import logging
logging.basicConfig(level=logging.DEBUG)
async def debug_example():
# 启用调试
asyncio.get_event_loop().set_debug(True)
# 设置慢回调警告阈值(秒)
asyncio.get_event_loop().slow_callback_duration = 0.1
await asyncio.sleep(0.2) # 这会触发慢回调警告