发布日期:2023-11-15 | 作者:Python技术专家
一、异步编程简介
在传统的同步编程模型中,程序按照顺序执行每个操作,当遇到I/O密集型任务(如网络请求、文件读写)时,线程会阻塞等待操作完成。这种模式在处理大量并发请求时效率低下。
Python的异步编程通过协程(Coroutine)实现了非阻塞的并发执行。asyncio库作为Python的异步IO框架,提供了编写单线程并发代码的基础设施。
同步 vs 异步执行对比
# 同步方式(顺序执行)
import time
def sync_task(url):
print(f"开始请求 {url}")
time.sleep(1) # 模拟网络延迟
print(f"完成请求 {url}")
# 执行3个任务需要约3秒
start = time.time()
for i in range(3):
sync_task(f"http://example.com/{i}")
print(f"同步执行时间: {time.time() - start:.2f}秒")
二、asyncio基础概念
1. 协程(Coroutine)
协程是异步编程的核心,使用async def
定义的函数即为协程函数:
import asyncio
async def simple_coroutine():
print("开始协程")
await asyncio.sleep(1) # 异步等待
print("协程结束")
# 运行协程
asyncio.run(simple_coroutine())
2. 事件循环(Event Loop)
事件循环是asyncio的核心,负责调度和执行协程:
async def main():
# 创建多个任务
tasks = [
asyncio.create_task(simple_coroutine())
for _ in range(3)
]
# 等待所有任务完成
await asyncio.gather(*tasks)
asyncio.run(main())
三、实战项目:异步Web爬虫开发
下面我们构建一个完整的异步Web爬虫,用于同时抓取多个网页内容。
项目结构
async_crawler/
├── main.py # 主程序
├── crawler.py # 爬虫核心类
├── utils.py # 工具函数
└── requirements.txt # 依赖包
1. 安装依赖
# requirements.txt
aiohttp==3.8.5
beautifulsoup4==4.12.2
aiofiles==23.2.1
2. 核心爬虫类实现
# crawler.py
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import aiofiles
import time
class AsyncWebCrawler:
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def fetch_url(self, session, url):
"""异步获取URL内容"""
async with self.semaphore: # 控制并发数量
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
html = await response.text()
return {
'url': url,
'content': html,
'status': 'success',
'timestamp': time.time()
}
else:
return {
'url': url,
'content': '',
'status': f'error: {response.status}',
'timestamp': time.time()
}
except Exception as e:
return {
'url': url,
'content': '',
'status': f'error: {str(e)}',
'timestamp': time.time()
}
async def parse_content(self, html_content, url):
"""异步解析HTML内容"""
soup = BeautifulSoup(html_content, 'html.parser')
title = soup.find('title')
return {
'url': url,
'title': title.text if title else 'No Title',
'links': len(soup.find_all('a')),
'images': len(soup.find_all('img'))
}
async def save_result(self, data, filename):
"""异步保存结果到文件"""
async with aiofiles.open(filename, 'a', encoding='utf-8') as f:
await f.write(f"{data}n")
async def crawl_urls(self, urls, output_file='results.txt'):
"""主爬虫方法"""
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [self.fetch_url(session, url) for url in urls]
responses = await asyncio.gather(*tasks)
# 处理成功的响应
successful_responses = [
resp for resp in responses if resp['status'] == 'success'
]
# 解析内容
parse_tasks = [
self.parse_content(resp['content'], resp['url'])
for resp in successful_responses
]
parsed_data = await asyncio.gather(*parse_tasks)
# 保存结果
save_tasks = [
self.save_result(data, output_file)
for data in parsed_data
]
await asyncio.gather(*save_tasks)
return {
'total_urls': len(urls),
'successful': len(successful_responses),
'failed': len(urls) - len(successful_responses),
'results': parsed_data
}
3. 主程序实现
# main.py
import asyncio
from crawler import AsyncWebCrawler
async def main():
# 示例URL列表
urls = [
'https://httpbin.org/html',
'https://httpbin.org/json',
'https://httpbin.org/xml',
'https://httpbin.org/redirect/1',
'https://httpbin.org/status/200',
'https://httpbin.org/status/404',
'https://httpbin.org/delay/1', # 模拟延迟
'https://httpbin.org/delay/2',
] * 3 # 重复3次以增加任务量
print(f"开始爬取 {len(urls)} 个URL...")
crawler = AsyncWebCrawler(max_concurrent=5)
start_time = asyncio.get_event_loop().time()
result = await crawler.crawl_urls(urls, 'crawler_results.txt')
end_time = asyncio.get_event_loop().time()
print(f"爬取完成!耗时: {end_time - start_time:.2f}秒")
print(f"总计: {result['total_urls']}个URL")
print(f"成功: {result['successful']}个")
print(f"失败: {result['failed']}个")
# 显示前5个结果
print("n前5个结果:")
for i, item in enumerate(result['results'][:5]):
print(f"{i+1}. {item['url']} - 标题: {item['title'][:30]}...")
if __name__ == "__main__":
asyncio.run(main())
四、性能对比分析
我们通过对比同步和异步版本的爬虫来展示性能差异:
同步版本爬虫
import requests
import time
class SyncWebCrawler:
def fetch_url(self, url):
try:
response = requests.get(url, timeout=10)
return {'url': url, 'status': 'success'}
except:
return {'url': url, 'status': 'error'}
def crawl_urls(self, urls):
start = time.time()
results = [self.fetch_url(url) for url in urls]
elapsed = time.time() - start
successful = sum(1 for r in results if r['status'] == 'success')
return {
'time': elapsed,
'successful': successful,
'total': len(urls)
}
# 测试20个URL
urls = ['https://httpbin.org/delay/1'] * 20
sync_crawler = SyncWebCrawler()
sync_result = sync_crawler.crawl_urls(urls)
print(f"同步爬虫: {sync_result}")
性能测试结果
测试场景 | 同步爬虫 | 异步爬虫(并发数=5) | 性能提升 |
---|---|---|---|
20个URL(每个延迟1秒) | 约20秒 | 约4秒 | 5倍 |
50个URL(每个延迟0.5秒) | 约25秒 | 约5秒 | 5倍 |
五、最佳实践建议
1. 合理控制并发数量
# 根据目标服务器承受能力调整并发数
async def smart_crawler():
# 动态调整并发数
base_concurrent = 5
adaptive_semaphore = asyncio.Semaphore(base_concurrent)
# 根据响应时间动态调整
response_times = []
async def adaptive_fetch(session, url):
start = time.time()
async with adaptive_semaphore:
# ... 请求逻辑
response_time = time.time() - start
response_times.append(response_time)
# 如果平均响应时间变长,减少并发
if len(response_times) > 10:
avg_time = sum(response_times[-10:]) / 10
if avg_time > 2.0: # 响应时间阈值
adaptive_semaphore._value = max(1, base_concurrent - 2)
2. 错误处理和重试机制
async def fetch_with_retry(session, url, retries=3):
for attempt in range(retries):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.text()
elif response.status in [429, 500, 502, 503, 504]:
# 服务器错误,等待后重试
wait_time = 2 ** attempt # 指数退避
await asyncio.sleep(wait_time)
continue
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == retries - 1:
raise e
await asyncio.sleep(1)
return None
3. 资源管理
async def bounded_crawl(urls, max_concurrent=10, batch_size=50):
"""分批处理大量URL,避免内存溢出"""
results = []
for i in range(0, len(urls), batch_size):
batch_urls = urls[i:i + batch_size]
batch_results = await process_batch(batch_urls, max_concurrent)
results.extend(batch_results)
# 批次间短暂停顿,减轻服务器压力
await asyncio.sleep(1)
return results