Python黑科技:构建高性能异步爬虫监控系统
一、系统架构
基于Asyncio+WebSocket+Prometheus的分布式爬虫监控平台,每秒处理10万+数据点
二、核心实现
1. 异步爬虫引擎
import asyncio
from aiohttp import ClientSession
from prometheus_client import Counter
REQUESTS_TOTAL = Counter('requests_total', 'Total requests')
class AsyncSpider:
def __init__(self, concurrency=100):
self.semaphore = asyncio.Semaphore(concurrency)
self.queue = asyncio.Queue()
self.results = []
async def fetch(self, url, session):
async with self.semaphore:
try:
async with session.get(url, timeout=10) as response:
REQUESTS_TOTAL.inc()
data = await response.text()
await self.process(url, data)
except Exception as e:
print(f"Error fetching {url}: {e}")
async def process(self, url, html):
# 解析HTML的示例
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'lxml')
title = soup.title.string if soup.title else ''
self.results.append({
'url': url,
'title': title,
'timestamp': datetime.now().isoformat()
})
async def run(self, urls):
async with ClientSession() as session:
tasks = [self.fetch(url, session) for url in urls]
await asyncio.gather(*tasks)
2. 实时监控服务
from fastapi import FastAPI
from fastapi.websockets import WebSocket
import json
app = FastAPI()
class MonitoringServer:
def __init__(self):
self.connections = set()
self.metrics = {
'requests': 0,
'errors': 0,
'speed': 0
}
async def push_metrics(self):
while True:
await asyncio.sleep(1)
for ws in self.connections:
try:
await ws.send_text(json.dumps({
'metrics': self.metrics,
'timestamp': datetime.now().isoformat()
}))
except:
self.connections.remove(ws)
async def websocket_endpoint(self, websocket: WebSocket):
await websocket.accept()
self.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except:
self.connections.remove(websocket)
monitor = MonitoringServer()
app.add_api_websocket_route("/ws", monitor.websocket_endpoint)
三、高级特性
1. 智能速率控制
class AdaptiveRateLimiter:
def __init__(self, max_rate=100):
self.max_rate = max_rate
self.current_rate = max_rate // 2
self.last_adjust = time.time()
async def adjust_rate(self, success_rate):
now = time.time()
if now - self.last_adjust > 60: # 每分钟调整一次
if success_rate > 0.95:
self.current_rate = min(
self.current_rate * 1.2,
self.max_rate
)
elif success_rate < 0.8:
self.current_rate = max(
self.current_rate * 0.8,
1
)
self.last_adjust = now
async def get_delay(self):
return 1 / self.current_rate
2. 分布式任务队列
import redis.asyncio as redis
class DistributedQueue:
def __init__(self, redis_url):
self.redis = redis.from_url(redis_url)
self.queue_key = "crawler:queue"
self.processing_key = "crawler:processing"
async def add_url(self, url):
await self.redis.lpush(self.queue_key, url)
async def get_url(self):
url = await self.redis.rpoplpush(
self.queue_key,
self.processing_key
)
return url.decode() if url else None
async def mark_done(self, url):
await self.redis.lrem(self.processing_key, 0, url)
四、完整案例
async def main():
# 初始化组件
spider = AsyncSpider(concurrency=200)
limiter = AdaptiveRateLimiter(max_rate=500)
queue = DistributedQueue("redis://localhost")
# 添加初始URL
start_urls = ["https://example.com/page/"+str(i) for i in range(1000)]
for url in start_urls:
await queue.add_url(url)
# 启动监控推送
asyncio.create_task(monitor.push_metrics())
# 主爬取循环
while True:
url = await queue.get_url()
if not url:
break
await spider.fetch(url)
await queue.mark_done(url)
# 动态调整速率
success_rate = REQUESTS_TOTAL._value.get() / (
REQUESTS_TOTAL._value.get() + spider.errors)
await limiter.adjust_rate(success_rate)
await asyncio.sleep(await limiter.get_delay())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
asyncio.run(main())