Python黑科技:构建高性能异步爬虫监控系统

2025-07-24 0 1,010

Python黑科技:构建高性能异步爬虫监控系统

一、系统架构

基于Asyncio+WebSocket+Prometheus的分布式爬虫监控平台,每秒处理10万+数据点

二、核心实现

1. 异步爬虫引擎

import asyncio
from aiohttp import ClientSession
from prometheus_client import Counter

REQUESTS_TOTAL = Counter('requests_total', 'Total requests')

class AsyncSpider:
    def __init__(self, concurrency=100):
        self.semaphore = asyncio.Semaphore(concurrency)
        self.queue = asyncio.Queue()
        self.results = []

    async def fetch(self, url, session):
        async with self.semaphore:
            try:
                async with session.get(url, timeout=10) as response:
                    REQUESTS_TOTAL.inc()
                    data = await response.text()
                    await self.process(url, data)
            except Exception as e:
                print(f"Error fetching {url}: {e}")

    async def process(self, url, html):
        # 解析HTML的示例
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(html, 'lxml')
        title = soup.title.string if soup.title else ''
        self.results.append({
            'url': url,
            'title': title,
            'timestamp': datetime.now().isoformat()
        })

    async def run(self, urls):
        async with ClientSession() as session:
            tasks = [self.fetch(url, session) for url in urls]
            await asyncio.gather(*tasks)

2. 实时监控服务

from fastapi import FastAPI
from fastapi.websockets import WebSocket
import json

app = FastAPI()

class MonitoringServer:
    def __init__(self):
        self.connections = set()
        self.metrics = {
            'requests': 0,
            'errors': 0,
            'speed': 0
        }

    async def push_metrics(self):
        while True:
            await asyncio.sleep(1)
            for ws in self.connections:
                try:
                    await ws.send_text(json.dumps({
                        'metrics': self.metrics,
                        'timestamp': datetime.now().isoformat()
                    }))
                except:
                    self.connections.remove(ws)

    async def websocket_endpoint(self, websocket: WebSocket):
        await websocket.accept()
        self.connections.add(websocket)
        try:
            while True:
                await websocket.receive_text()
        except:
            self.connections.remove(websocket)

monitor = MonitoringServer()
app.add_api_websocket_route("/ws", monitor.websocket_endpoint)

三、高级特性

1. 智能速率控制

class AdaptiveRateLimiter:
    def __init__(self, max_rate=100):
        self.max_rate = max_rate
        self.current_rate = max_rate // 2
        self.last_adjust = time.time()

    async def adjust_rate(self, success_rate):
        now = time.time()
        if now - self.last_adjust > 60:  # 每分钟调整一次
            if success_rate > 0.95:
                self.current_rate = min(
                    self.current_rate * 1.2, 
                    self.max_rate
                )
            elif success_rate < 0.8:
                self.current_rate = max(
                    self.current_rate * 0.8, 
                    1
                )
            self.last_adjust = now

    async def get_delay(self):
        return 1 / self.current_rate

2. 分布式任务队列

import redis.asyncio as redis

class DistributedQueue:
    def __init__(self, redis_url):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "crawler:queue"
        self.processing_key = "crawler:processing"

    async def add_url(self, url):
        await self.redis.lpush(self.queue_key, url)

    async def get_url(self):
        url = await self.redis.rpoplpush(
            self.queue_key, 
            self.processing_key
        )
        return url.decode() if url else None

    async def mark_done(self, url):
        await self.redis.lrem(self.processing_key, 0, url)

四、完整案例

async def main():
    # 初始化组件
    spider = AsyncSpider(concurrency=200)
    limiter = AdaptiveRateLimiter(max_rate=500)
    queue = DistributedQueue("redis://localhost")
    
    # 添加初始URL
    start_urls = ["https://example.com/page/"+str(i) for i in range(1000)]
    for url in start_urls:
        await queue.add_url(url)

    # 启动监控推送
    asyncio.create_task(monitor.push_metrics())

    # 主爬取循环
    while True:
        url = await queue.get_url()
        if not url:
            break
            
        await spider.fetch(url)
        await queue.mark_done(url)
        
        # 动态调整速率
        success_rate = REQUESTS_TOTAL._value.get() / (
            REQUESTS_TOTAL._value.get() + spider.errors)
        await limiter.adjust_rate(success_rate)
        await asyncio.sleep(await limiter.get_delay())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
    asyncio.run(main())
Python黑科技:构建高性能异步爬虫监控系统
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python黑科技:构建高性能异步爬虫监控系统 https://www.taomawang.com/server/python/625.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务