从协程应用到分布式架构的网络数据采集进阶教程
一、现代爬虫技术演进与设计原则
随着反爬机制的升级和数据分析需求的增长,现代爬虫系统需要具备:
- 高并发能力:单机万级QPS处理能力
- 低资源消耗:CPU/内存高效利用
- 强健壮性:自动重试与异常处理机制
- 分布式架构:支持横向扩展
- 智能调度:动态请求优先级控制
二、协程异步爬虫核心实现
1. 基于aiohttp的异步爬虫框架
import aiohttp
import asyncio
from bs4 import BeautifulSoup
class AsyncCrawler:
def __init__(self, concurrency=100):
self.semaphore = asyncio.Semaphore(concurrency)
self.session = aiohttp.ClientSession()
async def fetch(self, url):
async with self.semaphore:
try:
async with self.session.get(url, timeout=10) as response:
if response.status == 200:
return await response.text()
return None
except Exception as e:
print(f"请求失败 {url}: {str(e)}")
return None
async def parse(self, html):
soup = BeautifulSoup(html, 'lxml')
# 解析逻辑...
return data
async def pipeline(self, data):
# 数据存储逻辑...
pass
async def run(self, urls):
tasks = []
for url in urls:
task = asyncio.create_task(self.process_url(url))
tasks.append(task)
await asyncio.gather(*tasks)
async def process_url(self, url):
html = await self.fetch(url)
if html:
data = await self.parse(html)
await self.pipeline(data)
async def close(self):
await self.session.close()
# 使用示例
async def main():
crawler = AsyncCrawler()
urls = [f"https://example.com/page/{i}" for i in range(1, 100)]
await crawler.run(urls)
await crawler.close()
if __name__ == '__main__':
asyncio.run(main())
2. 智能请求速率控制
class SmartRateLimiter:
def __init__(self, max_rate, time_window=60):
self.max_rate = max_rate
self.time_window = time_window
self.request_times = []
async def wait(self):
now = time.time()
# 移除超时窗口外的记录
self.request_times = [t for t in self.request_times
if now - t = self.max_rate:
sleep_time = self.time_window - (now - self.request_times[0])
await asyncio.sleep(sleep_time)
return await self.wait()
self.request_times.append(now)
return True
# 集成到爬虫中
async def fetch(self, url):
await self.rate_limiter.wait()
# 继续原有请求逻辑...
三、分布式爬虫架构设计
1. 基于Redis的任务队列
import redis
from hashlib import md5
class DistributedScheduler:
def __init__(self, redis_url):
self.redis = redis.from_url(redis_url)
self.visited_key = "crawler:visited"
self.queue_key = "crawler:queue"
def add_url(self, url):
url_hash = md5(url.encode()).hexdigest()
if not self.redis.sismember(self.visited_key, url_hash):
self.redis.lpush(self.queue_key, url)
self.redis.sadd(self.visited_key, url_hash)
return True
return False
def get_url(self):
return self.redis.rpop(self.queue_key)
def get_batch_urls(self, count=10):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.lrange(self.queue_key, -count, -1)
pipe.ltrim(self.queue_key, 0, -count-1)
results = pipe.execute()
return results[0] if results else []
2. 工作节点实现
class WorkerNode:
def __init__(self, scheduler, crawler):
self.scheduler = scheduler
self.crawler = crawler
async def run(self):
while True:
urls = self.scheduler.get_batch_urls(20)
if not urls:
await asyncio.sleep(5)
continue
await self.crawler.run(urls)
# 上报心跳
self.report_status()
def report_status(self):
# 实现状态上报逻辑
pass
四、反反爬策略实战
1. 动态User-Agent池
from fake_useragent import UserAgent
class UserAgentManager:
def __init__(self):
self.ua = UserAgent()
self.blacklist = set()
def get_random(self):
while True:
ua = self.ua.random
if ua not in self.blacklist:
return ua
def mark_bad(self, ua):
self.blacklist.add(ua)
# 集成到请求头
headers = {
'User-Agent': ua_manager.get_random()
}
2. 智能代理IP管理
class ProxyPool:
def __init__(self, redis_url):
self.redis = redis.from_url(redis_url)
self.proxy_key = "crawler:proxies"
async def test_proxy(self, proxy):
try:
async with aiohttp.ClientSession() as session:
async with session.get('http://httpbin.org/ip',
proxy=proxy,
timeout=10) as resp:
return resp.status == 200
except:
return False
async def refresh_proxies(self):
# 从代理供应商获取新IP
new_proxies = await self.fetch_new_proxies()
# 并发测试代理可用性
tasks = []
for proxy in new_proxies:
tasks.append(self.test_and_add_proxy(proxy))
await asyncio.gather(*tasks)
async def test_and_add_proxy(self, proxy):
if await self.test_proxy(proxy):
self.redis.sadd(self.proxy_key, proxy)
def get_random_proxy(self):
return self.redis.srandmember(self.proxy_key)
五、数据存储与监控方案
1. 异步数据管道
import aiomysql
class AsyncDataPipeline:
def __init__(self, db_config):
self.db_config = db_config
self.pool = None
async def connect(self):
self.pool = await aiomysql.create_pool(**self.db_config)
async def save_item(self, item):
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
sql = """INSERT INTO crawled_data
(url, title, content, created_at)
VALUES (%s, %s, %s, NOW())"""
await cur.execute(sql, (
item['url'],
item['title'],
item['content']
))
await conn.commit()
async def close(self):
self.pool.close()
await self.pool.wait_closed()
2. Prometheus监控集成
from prometheus_client import (
start_http_server,
Counter,
Gauge,
Histogram
)
# 定义监控指标
REQUEST_COUNT = Counter(
'crawler_requests_total',
'Total request count',
['status']
)
LATENCY = Histogram(
'crawler_request_latency_seconds',
'Request latency distribution',
['method']
)
QUEUE_SIZE = Gauge(
'crawler_queue_size',
'Current task queue size'
)
# 在爬虫中埋点
async def fetch(self, url):
start = time.time()
try:
response = await self.session.get(url)
REQUEST_COUNT.labels(status='success').inc()
LATENCY.labels(method='GET').observe(time.time() - start)
return response
except:
REQUEST_COUNT.labels(status='fail').inc()
raise
# 启动监控服务器
start_http_server(8000)