一、Python异步编程基础概念
1.1 同步 vs 异步 vs 并发
理解异步编程首先要区分几个关键概念:
模式 | 执行方式 | 适用场景 | Python实现 |
---|---|---|---|
同步 | 顺序执行,阻塞等待 | 简单任务,CPU密集型 | 普通函数 |
多线程 | 并行执行,GIL限制 | IO密集型,GUI应用 | threading模块 |
多进程 | 真正并行,资源独立 | CPU密集型,计算任务 | multiprocessing模块 |
异步 | 单线程并发,非阻塞 | 高并发IO,网络应用 | asyncio模块 |
1.2 协程基础语法
import asyncio
import time
# 传统同步函数
def sync_fetch_data():
print("开始同步获取数据")
time.sleep(2) # 模拟IO操作
print("同步数据获取完成")
return "同步数据"
# 异步协程函数
async def async_fetch_data():
print("开始异步获取数据")
await asyncio.sleep(2) # 模拟异步IO操作
print("异步数据获取完成")
return "异步数据"
# 同步执行示例
def sync_demo():
start_time = time.time()
result1 = sync_fetch_data()
result2 = sync_fetch_data()
end_time = time.time()
print(f"同步执行耗时: {end_time - start_time:.2f}秒")
# 异步执行示例
async def async_demo():
start_time = time.time()
# 并发执行多个协程
result1, result2 = await asyncio.gather(
async_fetch_data(),
async_fetch_data()
)
end_time = time.time()
print(f"异步执行耗时: {end_time - start_time:.2f}秒")
if __name__ == "__main__":
print("=== 同步执行 ===")
sync_demo()
print("n=== 异步执行 ===")
asyncio.run(async_demo())
二、asyncio核心组件深度解析
2.1 事件循环(Event Loop)
import asyncio
import threading
class EventLoopManager:
def __init__(self):
self.loop = None
self.thread = None
def start_background_loop(self):
"""在后台线程中启动事件循环"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# 运行事件循环直到停止
self.loop.run_forever()
def get_loop(self):
"""获取事件循环实例"""
if self.loop is None:
self.thread = threading.Thread(target=self.start_background_loop, daemon=True)
self.thread.start()
# 等待循环启动
while self.loop is None:
time.sleep(0.1)
return self.loop
def run_coroutine(self, coro):
"""在后台循环中运行协程"""
future = asyncio.run_coroutine_threadsafe(coro, self.loop)
return future.result() # 阻塞等待结果
# 使用示例
async def background_task():
print("后台任务开始")
await asyncio.sleep(2)
print("后台任务完成")
return "任务结果"
# 在主线程中调度后台任务
manager = EventLoopManager()
result = manager.run_coroutine(background_task())
print(f"获取到结果: {result}")
2.2 任务(Task)与Future
import asyncio
from typing import List, Any
class TaskManager:
def __init__(self, max_concurrent: int = 5):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_gather(self, *coroutines, return_exceptions: bool = False):
"""带并发限制的gather实现"""
async def bounded_coroutine(coro):
async with self.semaphore:
return await coro
tasks = [asyncio.create_task(bounded_coroutine(coro)) for coro in coroutines]
return await asyncio.gather(*tasks, return_exceptions=return_exceptions)
async def create_task_with_timeout(self, coro, timeout: float, default_result=None):
"""创建带超时控制的任务"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
print(f"任务超时,返回默认值: {default_result}")
return default_result
async def task_with_retry(self, coro, max_retries: int = 3, delay: float = 1.0):
"""带重试机制的任务执行"""
for attempt in range(max_retries):
try:
return await coro
except Exception as e:
if attempt == max_retries - 1:
raise e
print(f"第{attempt + 1}次尝试失败: {e}, {delay}秒后重试...")
await asyncio.sleep(delay)
delay *= 2 # 指数退避
# 使用示例
async def mock_api_call(item_id: int) -> dict:
"""模拟API调用"""
await asyncio.sleep(1) # 模拟网络延迟
if item_id % 5 == 0: # 模拟部分失败
raise Exception(f"API调用失败: {item_id}")
return {"id": item_id, "data": f"项目{item_id}的数据"}
async def demo_task_management():
manager = TaskManager(max_concurrent=3)
# 创建多个任务
tasks = [mock_api_call(i) for i in range(10)]
# 带并发限制的执行
print("开始执行带并发限制的任务...")
results = await manager.bounded_gather(*tasks, return_exceptions=True)
# 处理结果
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"成功: {len(successful)}, 失败: {len(failed)}")
# 带重试的任务
print("n开始执行带重试的任务...")
try:
result = await manager.task_with_retry(
mock_api_call(5), # 这个会失败
max_retries=3
)
print(f"重试成功: {result}")
except Exception as e:
print(f"重试最终失败: {e}")
if __name__ == "__main__":
asyncio.run(demo_task_management())
三、高级异步编程模式
3.1 异步上下文管理器
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator
class AsyncDatabaseConnection:
"""模拟异步数据库连接"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.is_connected = False
async def connect(self):
"""模拟连接数据库"""
print(f"连接到数据库: {self.connection_string}")
await asyncio.sleep(0.5)
self.is_connected = True
print("数据库连接成功")
async def disconnect(self):
"""模拟断开数据库连接"""
if self.is_connected:
print("断开数据库连接")
await asyncio.sleep(0.2)
self.is_connected = False
async def execute_query(self, query: str) -> list:
"""执行查询"""
if not self.is_connected:
raise RuntimeError("数据库未连接")
print(f"执行查询: {query}")
await asyncio.sleep(0.3) # 模拟查询时间
return [{"id": 1, "name": "示例数据"}]
async def __aenter__(self) -> 'AsyncDatabaseConnection':
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
# 使用异步上下文管理器
@asynccontextmanager
async def get_database_connection(connection_string: str) -> AsyncIterator[AsyncDatabaseConnection]:
"""异步上下文管理器"""
connection = AsyncDatabaseConnection(connection_string)
try:
await connection.connect()
yield connection
finally:
await connection.disconnect()
async def demo_async_context():
# 方式1: 使用类的__aenter__和__aexit__
async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
results = await db.execute_query("SELECT * FROM users")
print(f"查询结果: {results}")
print("n" + "="*50 + "n")
# 方式2: 使用contextlib.asynccontextmanager
async with get_database_connection("mysql://localhost/testdb") as db:
results = await db.execute_query("SELECT * FROM products")
print(f"查询结果: {results}")
if __name__ == "__main__":
asyncio.run(demo_async_context())
3.2 异步迭代器与生成器
import asyncio
from typing import AsyncIterator, List
import random
class AsyncDataStream:
"""异步数据流迭代器"""
def __init__(self, data_source: str, chunk_size: int = 10):
self.data_source = data_source
self.chunk_size = chunk_size
self.position = 0
def __aiter__(self) -> AsyncIterator[List[str]]:
return self
async def __anext__(self) -> List[str]:
"""获取下一批数据"""
if self.position >= 100: # 模拟数据结束
raise StopAsyncIteration
# 模拟从数据源读取
await asyncio.sleep(0.1) # 模拟IO延迟
chunk = [
f"{self.data_source}_item_{i}"
for i in range(self.position, self.position + self.chunk_size)
]
self.position += self.chunk_size
return chunk
async def async_data_processor():
"""异步数据处理示例"""
stream = AsyncDataStream("user_behavior", chunk_size=5)
async for chunk in stream:
print(f"处理数据块: {chunk}")
# 模拟数据处理
await asyncio.sleep(0.2)
# 如果处理到特定条件可以提前退出
if "user_behavior_item_25" in chunk:
print("找到目标数据,提前结束迭代")
break
# 异步生成器示例
async def async_data_generator(start: int, end: int, batch_size: int = 3):
"""异步数据生成器"""
current = start
while current str:
"""处理单个数据项"""
await asyncio.sleep(0.1)
return f"processed_{item}"
if __name__ == "__main__":
print("=== 异步迭代器示例 ===")
asyncio.run(async_data_processor())
print("n=== 异步生成器示例 ===")
asyncio.run(process_data_in_batches())
四、实战:高性能Web爬虫系统
4.1 异步爬虫架构设计
import asyncio
import aiohttp
from urllib.parse import urljoin, urlparse
from typing import Set, List, Dict, Optional
import time
from dataclasses import dataclass
from bs4 import BeautifulSoup
@dataclass
class CrawlResult:
url: str
title: str
links: List[str]
content_length: int
status_code: int
crawl_time: float
class AsyncWebCrawler:
"""异步Web爬虫"""
def __init__(self,
base_url: str,
max_concurrent: int = 10,
request_timeout: int = 10,
max_pages: int = 100):
self.base_url = base_url
self.max_concurrent = max_concurrent
self.request_timeout = request_timeout
self.max_pages = max_pages
self.visited_urls: Set[str] = set()
self.results: List[CrawlResult] = []
self.semaphore = asyncio.Semaphore(max_concurrent)
# 统计信息
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'start_time': None,
'end_time': None
}
def is_valid_url(self, url: str) -> bool:
"""检查URL是否有效"""
parsed = urlparse(url)
return (parsed.netloc == urlparse(self.base_url).netloc and
parsed.scheme in ['http', 'https'])
async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]:
"""获取页面内容"""
async with self.semaphore:
try:
async with session.get(url, timeout=self.request_timeout) as response:
self.stats['total_requests'] += 1
if response.status == 200:
self.stats['successful_requests'] += 1
return await response.text()
else:
self.stats['failed_requests'] += 1
print(f"请求失败: {url}, 状态码: {response.status}")
return None
except Exception as e:
self.stats['failed_requests'] += 1
print(f"请求异常: {url}, 错误: {e}")
return None
def extract_links(self, html: str, base_url: str) -> List[str]:
"""从HTML中提取链接"""
soup = BeautifulSoup(html, 'html.parser')
links = []
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(base_url, href)
if self.is_valid_url(full_url) and full_url not in self.visited_urls:
links.append(full_url)
return links
async def crawl_page(self, session: aiohttp.ClientSession, url: str) -> Optional[CrawlResult]:
"""爬取单个页面"""
if (url in self.visited_urls or
len(self.visited_urls) >= self.max_pages):
return None
self.visited_urls.add(url)
print(f"爬取: {url}")
html = await self.fetch_page(session, url)
if not html:
return None
# 解析页面
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string if soup.title else "无标题"
links = self.extract_links(html, url)
result = CrawlResult(
url=url,
title=title.strip() if title else "",
links=links,
content_length=len(html),
status_code=200,
crawl_time=time.time()
)
self.results.append(result)
return result
async def start_crawling(self):
"""开始爬取"""
self.stats['start_time'] = time.time()
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
# 初始任务
tasks = [asyncio.create_task(self.crawl_page(session, self.base_url))]
while tasks and len(self.visited_urls) < self.max_pages:
# 等待任意任务完成
done, tasks = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# 处理完成的任务
for task in done:
result = await task
if result:
# 为新的链接创建任务
for link in result.links:
if (link not in self.visited_urls and
len(self.visited_urls) < self.max_pages):
new_task = asyncio.create_task(
self.crawl_page(session, link)
)
tasks.add(new_task)
self.stats['end_time'] = time.time()
def print_stats(self):
"""打印统计信息"""
duration = self.stats['end_time'] - self.stats['start_time']
print(f"n=== 爬取统计 ===")
print(f"总耗时: {duration:.2f}秒")
print(f"总请求数: {self.stats['total_requests']}")
print(f"成功请求: {self.stats['successful_requests']}")
print(f"失败请求: {self.stats['failed_requests']}")
print(f"爬取页面数: {len(self.results)}")
print(f"平均速度: {len(self.results)/duration:.2f} 页面/秒")
async def main():
"""主函数"""
# 注意:请替换为你要爬取的网站,确保你有权限爬取
crawler = AsyncWebCrawler(
base_url="https://httpbin.org", # 使用测试网站
max_concurrent=5,
max_pages=20
)
await crawler.start_crawling()
crawler.print_stats()
# 显示前几个结果
print(f"n=== 前5个结果 ===")
for result in crawler.results[:5]:
print(f"URL: {result.url}")
print(f"标题: {result.title}")
print(f"链接数: {len(result.links)}")
print(f"内容长度: {result.content_length}")
print("-" * 50)
if __name__ == "__main__":
asyncio.run(main())
五、性能优化与最佳实践
5.1 异步代码性能分析
import asyncio
import time
import cProfile
import pstats
from io import StringIO
from functools import wraps
def async_profiler(func):
"""异步函数性能分析装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time()
pr.disable()
# 输出分析结果
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(20) # 显示前20行
print(f"函数 {func.__name__} 执行时间: {end_time - start_time:.4f}秒")
print("性能分析结果:")
print(s.getvalue())
return result
return wrapper
class PerformanceMonitor:
"""异步性能监控器"""
def __init__(self):
self.metrics = {}
async def track_performance(self, operation_name: str, coro):
"""跟踪操作性能"""
start_time = time.time()
start_memory = self.get_memory_usage()
try:
result = await coro
end_time = time.time()
end_memory = self.get_memory_usage()
duration = end_time - start_time
memory_used = end_memory - start_memory
self.metrics[operation_name] = {
'duration': duration,
'memory_used': memory_used,
'timestamp': start_time
}
print(f"{operation_name}: 耗时{duration:.4f}秒, 内存使用{memory_used}KB")
return result
except Exception as e:
print(f"{operation_name} 执行失败: {e}")
raise
def get_memory_usage(self):
"""获取内存使用情况(简化版)"""
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 # 返回KB
def generate_report(self):
"""生成性能报告"""
if not self.metrics:
print("没有性能数据")
return
print("n=== 性能报告 ===")
total_duration = sum(metric['duration'] for metric in self.metrics.values())
print(f"总执行时间: {total_duration:.4f}秒")
for op_name, metric in self.metrics.items():
percentage = (metric['duration'] / total_duration) * 100
print(f"{op_name}: {metric['duration']:.4f}秒 ({percentage:.1f}%)")
# 使用示例
@async_profiler
async def performance_demo():
"""性能演示函数"""
monitor = PerformanceMonitor()
async def mock_io_operation(duration: float, name: str):
await asyncio.sleep(duration)
return f"{name}_completed"
# 跟踪多个操作的性能
tasks = []
for i in range(5):
task = monitor.track_performance(
f"io_operation_{i}",
mock_io_operation(0.5 + i * 0.1, f"task_{i}")
)
tasks.append(task)
results = await asyncio.gather(*tasks)
monitor.generate_report()
return results
if __name__ == "__main__":
print("=== 性能分析演示 ===")
asyncio.run(performance_demo())
5.2 最佳实践总结
- 避免阻塞操作:在异步代码中不要使用同步阻塞调用
- 合理控制并发数:使用Semaphore限制并发,避免资源耗尽
- 错误处理:为所有异步操作添加适当的异常处理
- 资源管理:使用异步上下文管理器确保资源正确释放
- 性能监控:持续监控异步应用的性能指标
- 测试策略:使用asyncio测试工具进行充分的异步测试