Python异步编程实战:基于Asyncio的高并发爬虫系统开发
一、异步爬虫架构设计
系统核心组件:
- 异步HTTP客户端:aiohttp库实现
- 任务调度器:Asyncio协程管理
- 数据管道:异步存储到MongoDB
- 反爬策略:自动代理轮换+请求限流
- 监控系统:实时统计爬取指标
二、核心代码实现
1. 异步爬虫主引擎
import asyncio
from aiohttp import ClientSession
from motor.motor_asyncio import AsyncIOMotorClient
class AsyncSpider:
def __init__(self, concurrency=10):
self.semaphore = asyncio.Semaphore(concurrency)
self.client = AsyncIOMotorClient('mongodb://localhost:27017')
self.db = self.client['spider_db']
async def fetch(self, url, session):
async with self.semaphore:
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {str(e)}")
return None
async def parse(self, html):
# 实现具体的解析逻辑
pass
async def save(self, data):
await self.db['items'].insert_one(data)
async def crawl(self, urls):
async with ClientSession() as session:
tasks = [self.process_url(url, session) for url in urls]
await asyncio.gather(*tasks)
async def process_url(self, url, session):
html = await self.fetch(url, session)
if html:
data = await self.parse(html)
if data:
await self.save(data)
2. 代理中间件实现
class ProxyMiddleware:
def __init__(self):
self.proxies = [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:8080'
]
self.current = 0
async def get_proxy(self):
proxy = self.proxies[self.current]
self.current = (self.current + 1) % len(self.proxies)
return proxy
async def process_request(self, url, session):
proxy = await self.get_proxy()
try:
async with session.get(url, proxy=proxy) as response:
return await response.text()
except:
return None
三、高级功能实现
1. 分布式任务队列
import aioredis
class TaskQueue:
def __init__(self):
self.redis = aioredis.from_url('redis://localhost')
async def add_url(self, url):
await self.redis.lpush('spider:start_urls', url)
async def get_url(self):
return await self.redis.rpop('spider:start_urls')
async def process_tasks(self):
while True:
url = await self.get_url()
if url:
await self.spider.crawl([url])
else:
await asyncio.sleep(1)
2. 性能监控装饰器
def monitor_performance(func):
async def wrapper(*args, **kwargs):
start = time.time()
result = await func(*args, **kwargs)
elapsed = time.time() - start
print(f"{func.__name__} executed in {elapsed:.2f} seconds")
return result
return wrapper
四、项目部署方案
- Docker容器化:打包Python环境和依赖
- Kubernetes编排:实现自动扩缩容
- Prometheus监控:收集运行时指标
- 日志收集:ELK栈集中管理
- 定时任务:Celery实现周期爬取