本文将介绍如何使用Python的异步编程技术构建高效的Web爬虫,并对采集的数据进行分析和可视化。通过本教程,您将学会如何利用asyncio和aiohttp库实现并发数据采集,以及使用Pandas和Matplotlib进行数据分析。
项目概述
我们将创建一个完整的异步爬虫系统,用于采集电商网站产品数据,然后对数据进行清洗、分析并生成可视化报告。这个项目涵盖了从数据采集到分析的全流程。
技术栈
- 异步编程: asyncio, aiohttp
- 数据解析: BeautifulSoup4, lxml
- 数据存储: SQLite, CSV
- 数据分析: Pandas, NumPy
- 数据可视化: Matplotlib, Seaborn
实现步骤
1. 安装所需库
# 基础库 pip install aiohttp asyncio beautifulsoup4 lxml # 数据分析库 pip install pandas numpy matplotlib seaborn # 可选:用于模拟浏览器行为 pip install fake-useragent
2. 异步爬虫核心代码
异步爬虫类 (async_crawler.py)
import asyncio import aiohttp from bs4 import BeautifulSoup import sqlite3 import json from datetime import datetime from fake_useragent import UserAgent class AsyncWebCrawler: def __init__(self, base_url, max_concurrent=10): self.base_url = base_url self.max_concurrent = max_concurrent self.conn = sqlite3.connect('products.db') self.create_table() self.ua = UserAgent() def create_table(self): """创建数据库表""" cursor = self.conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, price REAL, rating REAL, review_count INTEGER, category TEXT, description TEXT, crawled_at TIMESTAMP ) ''') self.conn.commit() async def fetch_page(self, session, url, retries=3): """异步获取页面内容""" headers = {'User-Agent': self.ua.random} for attempt in range(retries): try: async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status == 200: return await response.text() else: print(f"请求失败: {url}, 状态码: {response.status}") except Exception as e: print(f"请求异常 ({attempt+1}/{retries}): {url}, 错误: {e}") await asyncio.sleep(2) # 等待后重试 return None async def parse_product_page(self, session, url, semaphore): """解析产品页面""" async with semaphore: html = await self.fetch_page(session, url) if not html: return None try: soup = BeautifulSoup(html, 'lxml') # 提取产品信息(根据实际网站结构调整选择器) product_data = { 'title': self.extract_text(soup, 'h1.product-title'), 'price': self.extract_price(soup, '.price-container'), 'rating': self.extract_rating(soup, '.rating-stars'), 'review_count': self.extract_review_count(soup, '.review-count'), 'category': self.extract_category(soup, '.breadcrumb'), 'description': self.extract_description(soup, '.product-description') } # 保存到数据库 self.save_to_db(product_data) return product_data except Exception as e: print(f"解析页面失败: {url}, 错误: {e}") return None def extract_text(self, soup, selector): """提取文本信息""" element = soup.select_one(selector) return element.get_text(strip=True) if element else None def extract_price(self, soup, selector): """提取价格""" element = soup.select_one(selector) if element: text = element.get_text(strip=True) # 移除货币符号和逗号 text = text.replace('$', '').replace(',', '') try: return float(text) except ValueError: return None return None def extract_rating(self, soup, selector): """提取评分""" element = soup.select_one(selector) if element: # 尝试从类名或属性中提取评分 classes = element.get('class', []) for cls in classes: if 'rating-' in cls: try: return float(cls.split('-')[-1]) / 20 # 假设格式为 rating-45 (表示4.5分) except: continue return None def extract_review_count(self, soup, selector): """提取评论数量""" element = soup.select_one(selector) if element: text = element.get_text(strip=True) # 提取数字 import re numbers = re.findall(r'd+', text) return int(numbers[0]) if numbers else None return None def extract_category(self, soup, selector): """提取分类信息""" element = soup.select_one(selector) if element: links = element.select('a') if len(links) > 1: return links[-2].get_text(strip=True) # 通常倒数第二个链接是分类 return None def extract_description(self, soup, selector): """提取描述信息""" element = soup.select_one(selector) return element.get_text(strip=True) if element else None def save_to_db(self, product_data): """保存数据到数据库""" cursor = self.conn.cursor() cursor.execute(''' INSERT INTO products (title, price, rating, review_count, category, description, crawled_at) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( product_data['title'], product_data['price'], product_data['rating'], product_data['review_count'], product_data['category'], product_data['description'], datetime.now() )) self.conn.commit() async def crawl_category(self, category_url): """爬取分类页面中的所有产品""" async with aiohttp.ClientSession() as session: html = await self.fetch_page(session, category_url) if not html: return soup = BeautifulSoup(html, 'lxml') product_links = [] # 提取产品链接(根据实际网站结构调整选择器) for link in soup.select('.product-link a'): href = link.get('href') if href and '/product/' in href: product_links.append(self.base_url + href if href.startswith('/') else href) print(f"找到 {len(product_links)} 个产品") # 使用信号量控制并发数量 semaphore = asyncio.Semaphore(self.max_concurrent) tasks = [] for link in product_links: task = asyncio.create_task(self.parse_product_page(session, link, semaphore)) tasks.append(task) # 等待所有任务完成 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 successful = sum(1 for r in results if r is not None and not isinstance(r, Exception)) print(f"爬取完成: {successful}/{len(product_links)} 个产品成功") async def run(self, category_urls): """运行爬虫""" start_time = datetime.now() print(f"开始爬取: {start_time}") tasks = [self.crawl_category(url) for url in category_urls] await asyncio.gather(*tasks) end_time = datetime.now() duration = end_time - start_time print(f"爬取完成: {end_time}") print(f"总耗时: {duration}") self.conn.close() # 使用示例 if __name__ == "__main__": # 示例URL(需要替换为实际目标网站) base_url = "https://example-store.com" category_urls = [ f"{base_url}/category/electronics", f"{base_url}/category/books", f"{base_url}/category/clothing" ] crawler = AsyncWebCrawler(base_url, max_concurrent=5) asyncio.run(crawler.run(category_urls))
3. 数据分析与可视化
数据分析脚本 (data_analysis.py)
import sqlite3 import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from datetime import datetime class ProductAnalyzer: def __init__(self, db_path='products.db'): self.conn = sqlite3.connect(db_path) self.df = self.load_data() def load_data(self): """从数据库加载数据""" query = "SELECT * FROM products" df = pd.read_sql_query(query, self.conn) # 数据清洗 df = self.clean_data(df) return df def clean_data(self, df): """数据清洗""" # 删除重复项 df = df.drop_duplicates(subset=['title'], keep='first') # 处理缺失值 df['price'] = df['price'].fillna(df['price'].median()) df['rating'] = df['rating'].fillna(df['rating'].median()) df['review_count'] = df['review_count'].fillna(0) # 转换数据类型 df['crawled_at'] = pd.to_datetime(df['crawled_at']) return df def basic_stats(self): """生成基本统计信息""" stats = { 'total_products': len(self.df), 'price_stats': self.df['price'].describe(), 'rating_stats': self.df['rating'].describe(), 'categories': self.df['category'].nunique(), 'earliest_crawl': self.df['crawled_at'].min(), 'latest_crawl': self.df['crawled_at'].max() } return stats def category_analysis(self): """按类别分析""" category_stats = self.df.groupby('category').agg({ 'price': ['mean', 'median', 'count'], 'rating': 'mean', 'review_count': 'sum' }).round(2) category_stats.columns = ['avg_price', 'median_price', 'product_count', 'avg_rating', 'total_reviews'] return category_stats.sort_values('product_count', ascending=False) def price_analysis(self): """价格分析""" # 价格分布 price_bins = [0, 10, 25, 50, 100, 200, 500, 1000, float('inf')] price_labels = ['$1000'] self.df['price_range'] = pd.cut(self.df['price'], bins=price_bins, labels=price_labels) return self.df['price_range'].value_counts().sort_index() def correlation_analysis(self): """相关性分析""" numeric_df = self.df[['price', 'rating', 'review_count']] return numeric_df.corr() def generate_report(self): """生成分析报告""" print("=" * 50) print("产品数据分析报告") print("=" * 50) # 基本统计 stats = self.basic_stats() print(f"n1. 基本统计:") print(f" 产品总数: {stats['total_products']}") print(f" 价格范围: ${self.df['price'].min():.2f} - ${self.df['price'].max():.2f}") print(f" 平均评分: {self.df['rating'].mean():.2f}/5") print(f" 类别数量: {stats['categories']}") # 类别分析 print(f"n2. 按类别分析:") category_stats = self.category_analysis() print(category_stats.head(10)) # 价格分布 print(f"n3. 价格分布:") price_dist = self.price_analysis() print(price_dist) # 相关性 print(f"n4. 相关性分析:") correlation = self.correlation_analysis() print(correlation) def create_visualizations(self): """创建可视化图表""" # 设置样式 plt.style.use('seaborn-v0_8') sns.set_palette("husl") # 创建子图 fig, axes = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle('产品数据分析可视化', fontsize=16, fontweight='bold') # 1. 价格分布直方图 axes[0, 0].hist(self.df['price'], bins=30, edgecolor='black', alpha=0.7) axes[0, 0].set_title('价格分布') axes[0, 0].set_xlabel('价格') axes[0, 0].set_ylabel('产品数量') axes[0, 0].grid(True, alpha=0.3) # 2. 评分分布 rating_counts = self.df['rating'].value_counts().sort_index() axes[0, 1].bar(rating_counts.index, rating_counts.values, alpha=0.7) axes[0, 1].set_title('评分分布') axes[0, 1].set_xlabel('评分') axes[0, 1].set_ylabel('产品数量') axes[0, 1].grid(True, alpha=0.3) # 3. 各类别产品数量 top_categories = self.df['category'].value_counts().head(10) axes[1, 0].barh(range(len(top_categories)), top_categories.values) axes[1, 0].set_yticks(range(len(top_categories))) axes[1, 0].set_yticklabels(top_categories.index) axes[1, 0].set_title('Top 10 产品类别') axes[1, 0].set_xlabel('产品数量') axes[1, 0].invert_yaxis() # 从上到下显示 # 4. 价格与评分的散点图 axes[1, 1].scatter(self.df['price'], self.df['rating'], alpha=0.6) axes[1, 1].set_title('价格 vs 评分') axes[1, 1].set_xlabel('价格') axes[1, 1].set_ylabel('评分') axes[1, 1].grid(True, alpha=0.3) # 调整布局 plt.tight_layout() plt.savefig('product_analysis.png', dpi=300, bbox_inches='tight') plt.show() # 创建额外的高级可视化 self.create_advanced_visualizations() def create_advanced_visualizations(self): """创建高级可视化""" # 价格与评论数的关系 plt.figure(figsize=(10, 6)) plt.scatter(self.df['price'], self.df['review_count'], alpha=0.6) plt.xscale('log') # 使用对数尺度更好地显示数据 plt.yscale('log') plt.title('价格 vs 评论数量 (对数尺度)') plt.xlabel('价格 (对数)') plt.ylabel('评论数量 (对数)') plt.grid(True, alpha=0.3) plt.savefig('price_vs_reviews.png', dpi=300, bbox_inches='tight') plt.show() # 类别平均价格 category_prices = self.df.groupby('category')['price'].mean().sort_values(ascending=False).head(10) plt.figure(figsize=(12, 6)) category_prices.plot(kind='bar', color='skyblue') plt.title('Top 10 最贵产品类别') plt.xlabel('类别') plt.ylabel('平均价格') plt.xticks(rotation=45, ha='right') plt.tight_layout() plt.savefig('category_prices.png', dpi=300, bbox_inches='tight') plt.show() # 使用示例 if __name__ == "__main__": analyzer = ProductAnalyzer() analyzer.generate_report() analyzer.create_visualizations()
4. 主程序入口
主程序 (main.py)
import asyncio from async_crawler import AsyncWebCrawler from data_analysis import ProductAnalyzer import argparse def main(): parser = argparse.ArgumentParser(description='异步Web爬虫与数据分析工具') parser.add_argument('--crawl', action='store_true', help='运行爬虫') parser.add_argument('--analyze', action='store_true', help='运行数据分析') parser.add_argument('--url', help='目标网站基础URL') parser.add_argument('--categories', nargs='+', help='要爬取的类别URL列表') args = parser.parse_args() if args.crawl: if not args.url or not args.categories: print("请提供目标URL和类别URL列表") return crawler = AsyncWebCrawler(args.url, max_concurrent=5) asyncio.run(crawler.run(args.categories)) if args.analyze: analyzer = ProductAnalyzer() analyzer.generate_report() analyzer.create_visualizations() if __name__ == "__main__": main()
项目特点与优势
- 高效异步采集:使用asyncio和aiohttp实现高并发数据采集
- 健壮的错误处理:包含重试机制和异常处理,提高爬虫稳定性
- 数据持久化:支持SQLite数据库存储,确保数据不丢失
- 完整数据分析:提供从数据清洗到可视化的全流程分析
- 可扩展架构:模块化设计,易于添加新的数据源和分析方法
使用示例
# 运行爬虫 python main.py --crawl --url "https://example-store.com" --categories "https://example-store.com/category/electronics" "https://example-store.com/category/books" # 运行数据分析 python main.py --analyze # 同时运行爬虫和数据分析 python main.py --crawl --analyze --url "https://example-store.com" --categories "https://example-store.com/category/electronics"
注意事项
- 遵守目标网站的robots.txt协议
- 设置合理的请求间隔,避免对目标网站造成过大压力
- 使用代理IP和用户代理轮换,防止被封锁
- 注意法律和版权问题,仅采集公开可用数据
进一步优化方向
要使这个爬虫系统更加完善,可以考虑以下优化:
- 添加代理IP支持,提高爬虫的匿名性和稳定性
- 实现分布式爬虫架构,使用Redis进行任务队列管理
- 添加机器学习算法,自动识别和提取产品信息
- 集成更多数据源,实现跨平台数据对比分析
- 开发Web界面,提供交互式数据探索功能
总结
本教程详细介绍了如何使用Python构建一个完整的异步Web爬虫和数据分析系统。通过结合asyncio、aiohttp、BeautifulSoup和Pandas等库,我们实现了一个高效、稳定的数据采集和分析管道。
这个系统不仅能够高效地采集大量网络数据,还能对数据进行深入的清洗、分析和可视化,为决策提供数据支持。无论是用于市场研究、竞争分析还是价格监控,这个工具都能提供强大的数据采集和分析能力。
通过学习本教程,您将掌握Python异步编程、Web爬虫开发和数据分析的核心技能,这些技能在数据科学和Web开发领域都具有很高的实用价值。