Python异步Web爬虫与数据分析实战 – 高效采集与可视化技术指南

2025-08-27 0 126

本文将介绍如何使用Python的异步编程技术构建高效的Web爬虫,并对采集的数据进行分析和可视化。通过本教程,您将学会如何利用asyncio和aiohttp库实现并发数据采集,以及使用Pandas和Matplotlib进行数据分析

项目概述

我们将创建一个完整的异步爬虫系统,用于采集电商网站产品数据,然后对数据进行清洗、分析并生成可视化报告。这个项目涵盖了从数据采集到分析的全流程。

技术栈

  • 异步编程: asyncio, aiohttp
  • 数据解析: BeautifulSoup4, lxml
  • 数据存储: SQLite, CSV
  • 数据分析: Pandas, NumPy
  • 数据可视化: Matplotlib, Seaborn

实现步骤

1. 安装所需库

# 基础库
pip install aiohttp asyncio beautifulsoup4 lxml

# 数据分析库
pip install pandas numpy matplotlib seaborn

# 可选:用于模拟浏览器行为
pip install fake-useragent
    

2. 异步爬虫核心代码

异步爬虫类 (async_crawler.py)

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import sqlite3
import json
from datetime import datetime
from fake_useragent import UserAgent

class AsyncWebCrawler:
    def __init__(self, base_url, max_concurrent=10):
        self.base_url = base_url
        self.max_concurrent = max_concurrent
        self.conn = sqlite3.connect('products.db')
        self.create_table()
        self.ua = UserAgent()
    
    def create_table(self):
        """创建数据库表"""
        cursor = self.conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT,
                price REAL,
                rating REAL,
                review_count INTEGER,
                category TEXT,
                description TEXT,
                crawled_at TIMESTAMP
            )
        ''')
        self.conn.commit()
    
    async def fetch_page(self, session, url, retries=3):
        """异步获取页面内容"""
        headers = {'User-Agent': self.ua.random}
        for attempt in range(retries):
            try:
                async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as response:
                    if response.status == 200:
                        return await response.text()
                    else:
                        print(f"请求失败: {url}, 状态码: {response.status}")
            except Exception as e:
                print(f"请求异常 ({attempt+1}/{retries}): {url}, 错误: {e}")
                await asyncio.sleep(2)  # 等待后重试
        return None
    
    async def parse_product_page(self, session, url, semaphore):
        """解析产品页面"""
        async with semaphore:
            html = await self.fetch_page(session, url)
            if not html:
                return None
            
            try:
                soup = BeautifulSoup(html, 'lxml')
                
                # 提取产品信息(根据实际网站结构调整选择器)
                product_data = {
                    'title': self.extract_text(soup, 'h1.product-title'),
                    'price': self.extract_price(soup, '.price-container'),
                    'rating': self.extract_rating(soup, '.rating-stars'),
                    'review_count': self.extract_review_count(soup, '.review-count'),
                    'category': self.extract_category(soup, '.breadcrumb'),
                    'description': self.extract_description(soup, '.product-description')
                }
                
                # 保存到数据库
                self.save_to_db(product_data)
                return product_data
                
            except Exception as e:
                print(f"解析页面失败: {url}, 错误: {e}")
                return None
    
    def extract_text(self, soup, selector):
        """提取文本信息"""
        element = soup.select_one(selector)
        return element.get_text(strip=True) if element else None
    
    def extract_price(self, soup, selector):
        """提取价格"""
        element = soup.select_one(selector)
        if element:
            text = element.get_text(strip=True)
            # 移除货币符号和逗号
            text = text.replace('$', '').replace(',', '')
            try:
                return float(text)
            except ValueError:
                return None
        return None
    
    def extract_rating(self, soup, selector):
        """提取评分"""
        element = soup.select_one(selector)
        if element:
            # 尝试从类名或属性中提取评分
            classes = element.get('class', [])
            for cls in classes:
                if 'rating-' in cls:
                    try:
                        return float(cls.split('-')[-1]) / 20  # 假设格式为 rating-45 (表示4.5分)
                    except:
                        continue
        return None
    
    def extract_review_count(self, soup, selector):
        """提取评论数量"""
        element = soup.select_one(selector)
        if element:
            text = element.get_text(strip=True)
            # 提取数字
            import re
            numbers = re.findall(r'd+', text)
            return int(numbers[0]) if numbers else None
        return None
    
    def extract_category(self, soup, selector):
        """提取分类信息"""
        element = soup.select_one(selector)
        if element:
            links = element.select('a')
            if len(links) > 1:
                return links[-2].get_text(strip=True)  # 通常倒数第二个链接是分类
        return None
    
    def extract_description(self, soup, selector):
        """提取描述信息"""
        element = soup.select_one(selector)
        return element.get_text(strip=True) if element else None
    
    def save_to_db(self, product_data):
        """保存数据到数据库"""
        cursor = self.conn.cursor()
        cursor.execute('''
            INSERT INTO products (title, price, rating, review_count, category, description, crawled_at)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            product_data['title'],
            product_data['price'],
            product_data['rating'],
            product_data['review_count'],
            product_data['category'],
            product_data['description'],
            datetime.now()
        ))
        self.conn.commit()
    
    async def crawl_category(self, category_url):
        """爬取分类页面中的所有产品"""
        async with aiohttp.ClientSession() as session:
            html = await self.fetch_page(session, category_url)
            if not html:
                return
            
            soup = BeautifulSoup(html, 'lxml')
            product_links = []
            
            # 提取产品链接(根据实际网站结构调整选择器)
            for link in soup.select('.product-link a'):
                href = link.get('href')
                if href and '/product/' in href:
                    product_links.append(self.base_url + href if href.startswith('/') else href)
            
            print(f"找到 {len(product_links)} 个产品")
            
            # 使用信号量控制并发数量
            semaphore = asyncio.Semaphore(self.max_concurrent)
            tasks = []
            
            for link in product_links:
                task = asyncio.create_task(self.parse_product_page(session, link, semaphore))
                tasks.append(task)
            
            # 等待所有任务完成
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理结果
            successful = sum(1 for r in results if r is not None and not isinstance(r, Exception))
            print(f"爬取完成: {successful}/{len(product_links)} 个产品成功")
    
    async def run(self, category_urls):
        """运行爬虫"""
        start_time = datetime.now()
        print(f"开始爬取: {start_time}")
        
        tasks = [self.crawl_category(url) for url in category_urls]
        await asyncio.gather(*tasks)
        
        end_time = datetime.now()
        duration = end_time - start_time
        print(f"爬取完成: {end_time}")
        print(f"总耗时: {duration}")
        
        self.conn.close()

# 使用示例
if __name__ == "__main__":
    # 示例URL(需要替换为实际目标网站)
    base_url = "https://example-store.com"
    category_urls = [
        f"{base_url}/category/electronics",
        f"{base_url}/category/books",
        f"{base_url}/category/clothing"
    ]
    
    crawler = AsyncWebCrawler(base_url, max_concurrent=5)
    asyncio.run(crawler.run(category_urls))
    

3. 数据分析与可视化

数据分析脚本 (data_analysis.py)

import sqlite3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

class ProductAnalyzer:
    def __init__(self, db_path='products.db'):
        self.conn = sqlite3.connect(db_path)
        self.df = self.load_data()
    
    def load_data(self):
        """从数据库加载数据"""
        query = "SELECT * FROM products"
        df = pd.read_sql_query(query, self.conn)
        
        # 数据清洗
        df = self.clean_data(df)
        return df
    
    def clean_data(self, df):
        """数据清洗"""
        # 删除重复项
        df = df.drop_duplicates(subset=['title'], keep='first')
        
        # 处理缺失值
        df['price'] = df['price'].fillna(df['price'].median())
        df['rating'] = df['rating'].fillna(df['rating'].median())
        df['review_count'] = df['review_count'].fillna(0)
        
        # 转换数据类型
        df['crawled_at'] = pd.to_datetime(df['crawled_at'])
        
        return df
    
    def basic_stats(self):
        """生成基本统计信息"""
        stats = {
            'total_products': len(self.df),
            'price_stats': self.df['price'].describe(),
            'rating_stats': self.df['rating'].describe(),
            'categories': self.df['category'].nunique(),
            'earliest_crawl': self.df['crawled_at'].min(),
            'latest_crawl': self.df['crawled_at'].max()
        }
        return stats
    
    def category_analysis(self):
        """按类别分析"""
        category_stats = self.df.groupby('category').agg({
            'price': ['mean', 'median', 'count'],
            'rating': 'mean',
            'review_count': 'sum'
        }).round(2)
        
        category_stats.columns = ['avg_price', 'median_price', 'product_count', 'avg_rating', 'total_reviews']
        return category_stats.sort_values('product_count', ascending=False)
    
    def price_analysis(self):
        """价格分析"""
        # 价格分布
        price_bins = [0, 10, 25, 50, 100, 200, 500, 1000, float('inf')]
        price_labels = ['$1000']
        
        self.df['price_range'] = pd.cut(self.df['price'], bins=price_bins, labels=price_labels)
        return self.df['price_range'].value_counts().sort_index()
    
    def correlation_analysis(self):
        """相关性分析"""
        numeric_df = self.df[['price', 'rating', 'review_count']]
        return numeric_df.corr()
    
    def generate_report(self):
        """生成分析报告"""
        print("=" * 50)
        print("产品数据分析报告")
        print("=" * 50)
        
        # 基本统计
        stats = self.basic_stats()
        print(f"n1. 基本统计:")
        print(f"   产品总数: {stats['total_products']}")
        print(f"   价格范围: ${self.df['price'].min():.2f} - ${self.df['price'].max():.2f}")
        print(f"   平均评分: {self.df['rating'].mean():.2f}/5")
        print(f"   类别数量: {stats['categories']}")
        
        # 类别分析
        print(f"n2. 按类别分析:")
        category_stats = self.category_analysis()
        print(category_stats.head(10))
        
        # 价格分布
        print(f"n3. 价格分布:")
        price_dist = self.price_analysis()
        print(price_dist)
        
        # 相关性
        print(f"n4. 相关性分析:")
        correlation = self.correlation_analysis()
        print(correlation)
    
    def create_visualizations(self):
        """创建可视化图表"""
        # 设置样式
        plt.style.use('seaborn-v0_8')
        sns.set_palette("husl")
        
        # 创建子图
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        fig.suptitle('产品数据分析可视化', fontsize=16, fontweight='bold')
        
        # 1. 价格分布直方图
        axes[0, 0].hist(self.df['price'], bins=30, edgecolor='black', alpha=0.7)
        axes[0, 0].set_title('价格分布')
        axes[0, 0].set_xlabel('价格')
        axes[0, 0].set_ylabel('产品数量')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 评分分布
        rating_counts = self.df['rating'].value_counts().sort_index()
        axes[0, 1].bar(rating_counts.index, rating_counts.values, alpha=0.7)
        axes[0, 1].set_title('评分分布')
        axes[0, 1].set_xlabel('评分')
        axes[0, 1].set_ylabel('产品数量')
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 各类别产品数量
        top_categories = self.df['category'].value_counts().head(10)
        axes[1, 0].barh(range(len(top_categories)), top_categories.values)
        axes[1, 0].set_yticks(range(len(top_categories)))
        axes[1, 0].set_yticklabels(top_categories.index)
        axes[1, 0].set_title('Top 10 产品类别')
        axes[1, 0].set_xlabel('产品数量')
        axes[1, 0].invert_yaxis()  # 从上到下显示
        
        # 4. 价格与评分的散点图
        axes[1, 1].scatter(self.df['price'], self.df['rating'], alpha=0.6)
        axes[1, 1].set_title('价格 vs 评分')
        axes[1, 1].set_xlabel('价格')
        axes[1, 1].set_ylabel('评分')
        axes[1, 1].grid(True, alpha=0.3)
        
        # 调整布局
        plt.tight_layout()
        plt.savefig('product_analysis.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        # 创建额外的高级可视化
        self.create_advanced_visualizations()
    
    def create_advanced_visualizations(self):
        """创建高级可视化"""
        # 价格与评论数的关系
        plt.figure(figsize=(10, 6))
        plt.scatter(self.df['price'], self.df['review_count'], alpha=0.6)
        plt.xscale('log')  # 使用对数尺度更好地显示数据
        plt.yscale('log')
        plt.title('价格 vs 评论数量 (对数尺度)')
        plt.xlabel('价格 (对数)')
        plt.ylabel('评论数量 (对数)')
        plt.grid(True, alpha=0.3)
        plt.savefig('price_vs_reviews.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        # 类别平均价格
        category_prices = self.df.groupby('category')['price'].mean().sort_values(ascending=False).head(10)
        plt.figure(figsize=(12, 6))
        category_prices.plot(kind='bar', color='skyblue')
        plt.title('Top 10 最贵产品类别')
        plt.xlabel('类别')
        plt.ylabel('平均价格')
        plt.xticks(rotation=45, ha='right')
        plt.tight_layout()
        plt.savefig('category_prices.png', dpi=300, bbox_inches='tight')
        plt.show()

# 使用示例
if __name__ == "__main__":
    analyzer = ProductAnalyzer()
    analyzer.generate_report()
    analyzer.create_visualizations()
    

4. 主程序入口

主程序 (main.py)

import asyncio
from async_crawler import AsyncWebCrawler
from data_analysis import ProductAnalyzer
import argparse

def main():
    parser = argparse.ArgumentParser(description='异步Web爬虫与数据分析工具')
    parser.add_argument('--crawl', action='store_true', help='运行爬虫')
    parser.add_argument('--analyze', action='store_true', help='运行数据分析')
    parser.add_argument('--url', help='目标网站基础URL')
    parser.add_argument('--categories', nargs='+', help='要爬取的类别URL列表')
    args = parser.parse_args()
    
    if args.crawl:
        if not args.url or not args.categories:
            print("请提供目标URL和类别URL列表")
            return
        
        crawler = AsyncWebCrawler(args.url, max_concurrent=5)
        asyncio.run(crawler.run(args.categories))
    
    if args.analyze:
        analyzer = ProductAnalyzer()
        analyzer.generate_report()
        analyzer.create_visualizations()

if __name__ == "__main__":
    main()
    

项目特点与优势

  1. 高效异步采集:使用asyncio和aiohttp实现高并发数据采集
  2. 健壮的错误处理:包含重试机制和异常处理,提高爬虫稳定性
  3. 数据持久化:支持SQLite数据库存储,确保数据不丢失
  4. 完整数据分析:提供从数据清洗到可视化的全流程分析
  5. 可扩展架构:模块化设计,易于添加新的数据源和分析方法

使用示例

# 运行爬虫
python main.py --crawl --url "https://example-store.com" --categories "https://example-store.com/category/electronics" "https://example-store.com/category/books"

# 运行数据分析
python main.py --analyze

# 同时运行爬虫和数据分析
python main.py --crawl --analyze --url "https://example-store.com" --categories "https://example-store.com/category/electronics"
    

注意事项

  • 遵守目标网站的robots.txt协议
  • 设置合理的请求间隔,避免对目标网站造成过大压力
  • 使用代理IP和用户代理轮换,防止被封锁
  • 注意法律和版权问题,仅采集公开可用数据

进一步优化方向

要使这个爬虫系统更加完善,可以考虑以下优化:

  1. 添加代理IP支持,提高爬虫的匿名性和稳定性
  2. 实现分布式爬虫架构,使用Redis进行任务队列管理
  3. 添加机器学习算法,自动识别和提取产品信息
  4. 集成更多数据源,实现跨平台数据对比分析
  5. 开发Web界面,提供交互式数据探索功能

总结

本教程详细介绍了如何使用Python构建一个完整的异步Web爬虫和数据分析系统。通过结合asyncio、aiohttp、BeautifulSoup和Pandas等库,我们实现了一个高效、稳定的数据采集和分析管道。

这个系统不仅能够高效地采集大量网络数据,还能对数据进行深入的清洗、分析和可视化,为决策提供数据支持。无论是用于市场研究、竞争分析还是价格监控,这个工具都能提供强大的数据采集和分析能力。

通过学习本教程,您将掌握Python异步编程、Web爬虫开发和数据分析的核心技能,这些技能在数据科学和Web开发领域都具有很高的实用价值。

Python异步Web爬虫与数据分析实战 - 高效采集与可视化技术指南
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python异步Web爬虫与数据分析实战 – 高效采集与可视化技术指南 https://www.taomawang.com/server/python/986.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务