发布日期:2024年4月5日
一、系统架构设计
本教程将实现一个企业级分布式爬虫系统,包含以下核心模块:
- 智能调度中心:基于机器学习的任务分配
- 动态渲染集群:无头浏览器自动化
- 代理IP池:自适应代理质量评估
- 分布式存储:MongoDB分片集群
- 实时监控:Prometheus+Grafana
技术栈:Python 3.11 + Scrapy + Playwright + Celery + Redis + MongoDB
二、环境准备与项目初始化
1. 创建虚拟环境
python -m venv spider_env
source spider_env/bin/activate # Linux/Mac
spider_envScriptsactivate # Windows
pip install scrapy playwright celery redis pymongo prometheus-client
2. 项目目录结构
distributed_spider/
├── spiders/ # 爬虫核心
│ ├── core/ # 基础组件
│ ├── middlewares.py # 中间件
│ └── spiders/ # 具体爬虫
├── proxy_pool/ # 代理池服务
├── scheduler/ # 分布式调度
├── storage/ # 数据存储
├── utils/ # 工具函数
├── config.py # 配置文件
└── monitor/ # 监控系统
三、核心爬虫实现
1. 异步爬虫基类
# spiders/core/async_spider.py
import asyncio
from scrapy import Spider
from playwright.async_api import async_playwright
class AsyncBrowserSpider(Spider):
custom_settings = {
'DOWNLOAD_HANDLERS': {
"http": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
"https": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
}
}
async def parse(self, response):
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
# 动态交互逻辑
await page.goto(response.url)
await page.click('#load-more')
await page.wait_for_selector('.item')
# 提取数据
items = await page.evaluate('''() => {
return Array.from(document.querySelectorAll('.item')).map(el => ({
title: el.querySelector('h3').innerText,
price: el.querySelector('.price').innerText
}));
}''')
for item in items:
yield item
await browser.close()
2. 智能反检测策略
# spiders/middlewares.py
from fake_useragent import UserAgent
import random
class AntiDetectionMiddleware:
def process_request(self, request, spider):
# 随机User-Agent
ua = UserAgent()
request.headers['User-Agent'] = ua.random
# 请求延迟随机化
request.meta['download_latency'] = random.uniform(1, 3)
# 动态Cookie处理
if 'login_cookie' in spider.custom_settings:
request.cookies = self.gen_dynamic_cookies()
# 鼠标移动轨迹模拟
if hasattr(spider, 'playwright'):
request.meta['playwright_context_kwargs'] = {
'user_agent': ua.random,
'viewport': {'width': 1920, 'height': 1080},
'record_har_path': f'hars/{request.url.split("//")[1].replace("/", "_")}.har'
}
def gen_dynamic_cookies(self):
# 动态生成合法Cookie
return {'session_id': str(uuid.uuid4())}
四、分布式任务调度
1. Celery任务分发
# scheduler/tasks.py
from celery import Celery
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
app = Celery('spider_tasks',
broker='redis://localhost:6379/0',
backend='mongodb://localhost:27017/spider_results')
@app.task(bind=True)
def run_spider(self, spider_name, **kwargs):
process = CrawlerProcess(get_project_settings())
deferred = process.crawl(spider_name, **kwargs)
# 任务状态跟踪
self.update_state(state='PROGRESS', meta={'status': 'crawling'})
try:
deferred.addCallback(lambda _: {'status': 'completed'})
process.start()
except Exception as e:
return {'status': 'failed', 'error': str(e)}
return {'status': 'completed'}
2. 动态负载均衡
# scheduler/balancer.py
import redis
from sklearn.ensemble import RandomForestRegressor
import numpy as np
class LoadBalancer:
def __init__(self):
self.redis = redis.Redis()
self.model = RandomForestRegressor()
self.load_history = []
def predict_optimal_node(self, task_complexity):
# 基于机器学习预测最佳节点
X = np.array([x['features'] for x in self.load_history])
y = np.array([x['duration'] for x in self.load_history])
if len(X) > 10: # 有足够训练数据
self.model.fit(X, y)
preds = [
self.model.predict([[task_complexity, node['cpu'], node['mem']]])
for node in self.get_available_nodes()
]
return np.argmin(preds)
else:
# 使用轮询策略
return len(self.load_history) % len(self.get_available_nodes())
def get_available_nodes(self):
# 从Redis获取节点状态
return [
{'node': n, 'cpu': float(c), 'mem': float(m)}
for n, c, m in self.redis.hgetall('node_status').items()
]
五、智能代理IP池
1. 代理质量评估
# proxy_pool/evaluator.py
import asyncio
from aiohttp import ClientSession, TCPConnector
class ProxyEvaluator:
def __init__(self):
self.proxy_queue = asyncio.Queue()
self.valid_proxies = set()
async def test_proxy(self, proxy):
test_urls = [
'http://httpbin.org/ip',
'https://www.baidu.com',
'https://www.google.com'
]
success = 0
async with ClientSession(connector=TCPConnector(ssl=False)) as session:
for url in test_urls:
try:
async with session.get(url, proxy=proxy, timeout=10) as resp:
if resp.status == 200:
success += 1
except:
continue
return success / len(test_urls) # 返回成功率
async def evaluate_all(self, proxies):
tasks = [self.test_proxy(proxy) for proxy in proxies]
results = await asyncio.gather(*tasks)
# 筛选高质量代理
valid = [
proxy for proxy, score in zip(proxies, results)
if score > 0.7
]
# 更新代理池
self.update_proxy_pool(valid)
def update_proxy_pool(self, new_proxies):
# 实现代理池的更新逻辑
pass
2. 动态代理选择
# proxy_pool/selector.py
import random
from collections import defaultdict
class ProxySelector:
def __init__(self):
self.proxy_stats = defaultdict(lambda: {
'success': 0,
'fail': 0,
'speed': 0
})
def get_proxy(self, target_domain):
"""根据目标域名选择最优代理"""
suitable = [
p for p in self.proxy_stats
if self.is_suitable(p, target_domain)
]
if not suitable:
return None
# 基于UCB算法选择
total = sum(p['success'] + p['fail'] for p in self.proxy_stats.values())
proxies = []
for proxy in suitable:
stats = self.proxy_stats[proxy]
n = stats['success'] + stats['fail']
if n == 0:
score = float('inf')
else:
avg = stats['success'] / n
score = avg + math.sqrt(2 * math.log(total) / n)
proxies.append((score, proxy))
return max(proxies)[1]
def is_suitable(self, proxy, domain):
"""检查代理是否适合目标域名"""
# 实现地域、协议等检查
return True
六、分布式存储方案
1. MongoDB分片配置
# storage/mongo_shard.py
from pymongo import MongoClient, ASCENDING
from pymongo.errors import DuplicateKeyError
class ShardedMongoStorage:
def __init__(self):
self.client = MongoClient(
'mongodb://config1.example.com,config2.example.com/configdb',
replicaset='configReplSet'
)
# 启用分片
self.client.admin.command('enableSharding', 'spider_db')
# 创建分片键索引
self.client.spider_db.items.create_index(
[('domain', ASCENDING), ('_id', ASCENDING)],
unique=True
)
# 配置分片
self.client.admin.command(
'shardCollection',
'spider_db.items',
key={'domain': 1, '_id': 1}
)
def insert_item(self, item):
try:
self.client.spider_db.items.insert_one(item)
except DuplicateKeyError:
self.handle_duplicate(item)
def handle_duplicate(self, item):
"""处理重复数据的策略"""
# 实现更新或忽略逻辑
pass
2. 数据清洗管道
# storage/pipelines.py
import re
from datetime import datetime
import html
class DataCleaningPipeline:
def process_item(self, item, spider):
# HTML标签清理
item['title'] = self.clean_html(item.get('title', ''))
item['content'] = self.clean_html(item.get('content', ''))
# 日期标准化
if 'publish_date' in item:
item['publish_date'] = self.normalize_date(item['publish_date'])
# 价格格式化
if 'price' in item:
item['price'] = float(re.sub(r'[^d.]', '', item['price']))
return item
def clean_html(self, text):
# 移除HTML标签但保留换行
text = html.unescape(text)
return re.sub(r']+>', '', text)
def normalize_date(self, date_str):
# 多种日期格式解析
for fmt in ('%Y-%m-%d', '%d/%m/%Y', '%b %d, %Y'):
try:
return datetime.strptime(date_str, fmt).isoformat()
except ValueError:
continue
return None
七、实时监控系统
1. Prometheus指标收集
# monitor/metrics.py
from prometheus_client import start_http_server, Counter, Gauge, Histogram
class SpiderMetrics:
def __init__(self):
self.requests_total = Counter(
'spider_requests_total',
'Total requests made',
['spider', 'status']
)
self.items_scraped = Counter(
'spider_items_scraped',
'Items scraped',
['spider']
)
self.response_time = Histogram(
'spider_response_time_seconds',
'Response time histogram',
['spider'],
buckets=(0.1, 0.5, 1, 2.5, 5, 10)
)
def start_exporter(self, port=8000):
start_http_server(port)
def record_request(self, spider, status):
self.requests_total.labels(
spider=spider.name,
status=status
).inc()
def record_item(self, spider):
self.items_scraped.labels(
spider=spider.name
).inc()
2. Grafana监控面板
# monitor/grafana/dashboard.json
{
"title": "爬虫系统监控",
"panels": [
{
"title": "请求成功率",
"type": "stat",
"targets": [{
"expr": "rate(spider_requests_total{status=~'2..'}[5m]) / rate(spider_requests_total[5m])",
"legendFormat": "{{spider}}"
}]
},
{
"title": "爬取速度",
"type": "graph",
"targets": [{
"expr": "rate(spider_items_scraped[5m])",
"legendFormat": "{{spider}}"
}]
}
]
}
八、总结与扩展
通过本教程,您已经掌握了:
- Python异步爬虫的高级实现
- 分布式系统的架构设计
- 智能代理池的优化策略
- 大规模数据存储方案
- 生产环境监控体系
扩展学习方向:
- 基于机器学习的反反爬策略
- Kubernetes容器化部署
- 数据质量监控系统
- 增量爬取与变更检测