本文将介绍如何使用Python的异步编程技术构建高效的Web爬虫,并对采集的数据进行分析和可视化。通过本教程,您将学会如何利用asyncio和aiohttp库实现并发数据采集,以及使用Pandas和Matplotlib进行数据分析。
项目概述
我们将创建一个完整的异步爬虫系统,用于采集电商网站产品数据,然后对数据进行清洗、分析并生成可视化报告。这个项目涵盖了从数据采集到分析的全流程。
技术栈
- 异步编程: asyncio, aiohttp
- 数据解析: BeautifulSoup4, lxml
- 数据存储: SQLite, CSV
- 数据分析: Pandas, NumPy
- 数据可视化: Matplotlib, Seaborn
实现步骤
1. 安装所需库
# 基础库
pip install aiohttp asyncio beautifulsoup4 lxml
# 数据分析库
pip install pandas numpy matplotlib seaborn
# 可选:用于模拟浏览器行为
pip install fake-useragent
2. 异步爬虫核心代码
异步爬虫类 (async_crawler.py)
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import sqlite3
import json
from datetime import datetime
from fake_useragent import UserAgent
class AsyncWebCrawler:
def __init__(self, base_url, max_concurrent=10):
self.base_url = base_url
self.max_concurrent = max_concurrent
self.conn = sqlite3.connect('products.db')
self.create_table()
self.ua = UserAgent()
def create_table(self):
"""创建数据库表"""
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
price REAL,
rating REAL,
review_count INTEGER,
category TEXT,
description TEXT,
crawled_at TIMESTAMP
)
''')
self.conn.commit()
async def fetch_page(self, session, url, retries=3):
"""异步获取页面内容"""
headers = {'User-Agent': self.ua.random}
for attempt in range(retries):
try:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
return await response.text()
else:
print(f"请求失败: {url}, 状态码: {response.status}")
except Exception as e:
print(f"请求异常 ({attempt+1}/{retries}): {url}, 错误: {e}")
await asyncio.sleep(2) # 等待后重试
return None
async def parse_product_page(self, session, url, semaphore):
"""解析产品页面"""
async with semaphore:
html = await self.fetch_page(session, url)
if not html:
return None
try:
soup = BeautifulSoup(html, 'lxml')
# 提取产品信息(根据实际网站结构调整选择器)
product_data = {
'title': self.extract_text(soup, 'h1.product-title'),
'price': self.extract_price(soup, '.price-container'),
'rating': self.extract_rating(soup, '.rating-stars'),
'review_count': self.extract_review_count(soup, '.review-count'),
'category': self.extract_category(soup, '.breadcrumb'),
'description': self.extract_description(soup, '.product-description')
}
# 保存到数据库
self.save_to_db(product_data)
return product_data
except Exception as e:
print(f"解析页面失败: {url}, 错误: {e}")
return None
def extract_text(self, soup, selector):
"""提取文本信息"""
element = soup.select_one(selector)
return element.get_text(strip=True) if element else None
def extract_price(self, soup, selector):
"""提取价格"""
element = soup.select_one(selector)
if element:
text = element.get_text(strip=True)
# 移除货币符号和逗号
text = text.replace('$', '').replace(',', '')
try:
return float(text)
except ValueError:
return None
return None
def extract_rating(self, soup, selector):
"""提取评分"""
element = soup.select_one(selector)
if element:
# 尝试从类名或属性中提取评分
classes = element.get('class', [])
for cls in classes:
if 'rating-' in cls:
try:
return float(cls.split('-')[-1]) / 20 # 假设格式为 rating-45 (表示4.5分)
except:
continue
return None
def extract_review_count(self, soup, selector):
"""提取评论数量"""
element = soup.select_one(selector)
if element:
text = element.get_text(strip=True)
# 提取数字
import re
numbers = re.findall(r'd+', text)
return int(numbers[0]) if numbers else None
return None
def extract_category(self, soup, selector):
"""提取分类信息"""
element = soup.select_one(selector)
if element:
links = element.select('a')
if len(links) > 1:
return links[-2].get_text(strip=True) # 通常倒数第二个链接是分类
return None
def extract_description(self, soup, selector):
"""提取描述信息"""
element = soup.select_one(selector)
return element.get_text(strip=True) if element else None
def save_to_db(self, product_data):
"""保存数据到数据库"""
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO products (title, price, rating, review_count, category, description, crawled_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
product_data['title'],
product_data['price'],
product_data['rating'],
product_data['review_count'],
product_data['category'],
product_data['description'],
datetime.now()
))
self.conn.commit()
async def crawl_category(self, category_url):
"""爬取分类页面中的所有产品"""
async with aiohttp.ClientSession() as session:
html = await self.fetch_page(session, category_url)
if not html:
return
soup = BeautifulSoup(html, 'lxml')
product_links = []
# 提取产品链接(根据实际网站结构调整选择器)
for link in soup.select('.product-link a'):
href = link.get('href')
if href and '/product/' in href:
product_links.append(self.base_url + href if href.startswith('/') else href)
print(f"找到 {len(product_links)} 个产品")
# 使用信号量控制并发数量
semaphore = asyncio.Semaphore(self.max_concurrent)
tasks = []
for link in product_links:
task = asyncio.create_task(self.parse_product_page(session, link, semaphore))
tasks.append(task)
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
successful = sum(1 for r in results if r is not None and not isinstance(r, Exception))
print(f"爬取完成: {successful}/{len(product_links)} 个产品成功")
async def run(self, category_urls):
"""运行爬虫"""
start_time = datetime.now()
print(f"开始爬取: {start_time}")
tasks = [self.crawl_category(url) for url in category_urls]
await asyncio.gather(*tasks)
end_time = datetime.now()
duration = end_time - start_time
print(f"爬取完成: {end_time}")
print(f"总耗时: {duration}")
self.conn.close()
# 使用示例
if __name__ == "__main__":
# 示例URL(需要替换为实际目标网站)
base_url = "https://example-store.com"
category_urls = [
f"{base_url}/category/electronics",
f"{base_url}/category/books",
f"{base_url}/category/clothing"
]
crawler = AsyncWebCrawler(base_url, max_concurrent=5)
asyncio.run(crawler.run(category_urls))
3. 数据分析与可视化
数据分析脚本 (data_analysis.py)
import sqlite3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
class ProductAnalyzer:
def __init__(self, db_path='products.db'):
self.conn = sqlite3.connect(db_path)
self.df = self.load_data()
def load_data(self):
"""从数据库加载数据"""
query = "SELECT * FROM products"
df = pd.read_sql_query(query, self.conn)
# 数据清洗
df = self.clean_data(df)
return df
def clean_data(self, df):
"""数据清洗"""
# 删除重复项
df = df.drop_duplicates(subset=['title'], keep='first')
# 处理缺失值
df['price'] = df['price'].fillna(df['price'].median())
df['rating'] = df['rating'].fillna(df['rating'].median())
df['review_count'] = df['review_count'].fillna(0)
# 转换数据类型
df['crawled_at'] = pd.to_datetime(df['crawled_at'])
return df
def basic_stats(self):
"""生成基本统计信息"""
stats = {
'total_products': len(self.df),
'price_stats': self.df['price'].describe(),
'rating_stats': self.df['rating'].describe(),
'categories': self.df['category'].nunique(),
'earliest_crawl': self.df['crawled_at'].min(),
'latest_crawl': self.df['crawled_at'].max()
}
return stats
def category_analysis(self):
"""按类别分析"""
category_stats = self.df.groupby('category').agg({
'price': ['mean', 'median', 'count'],
'rating': 'mean',
'review_count': 'sum'
}).round(2)
category_stats.columns = ['avg_price', 'median_price', 'product_count', 'avg_rating', 'total_reviews']
return category_stats.sort_values('product_count', ascending=False)
def price_analysis(self):
"""价格分析"""
# 价格分布
price_bins = [0, 10, 25, 50, 100, 200, 500, 1000, float('inf')]
price_labels = ['$1000']
self.df['price_range'] = pd.cut(self.df['price'], bins=price_bins, labels=price_labels)
return self.df['price_range'].value_counts().sort_index()
def correlation_analysis(self):
"""相关性分析"""
numeric_df = self.df[['price', 'rating', 'review_count']]
return numeric_df.corr()
def generate_report(self):
"""生成分析报告"""
print("=" * 50)
print("产品数据分析报告")
print("=" * 50)
# 基本统计
stats = self.basic_stats()
print(f"n1. 基本统计:")
print(f" 产品总数: {stats['total_products']}")
print(f" 价格范围: ${self.df['price'].min():.2f} - ${self.df['price'].max():.2f}")
print(f" 平均评分: {self.df['rating'].mean():.2f}/5")
print(f" 类别数量: {stats['categories']}")
# 类别分析
print(f"n2. 按类别分析:")
category_stats = self.category_analysis()
print(category_stats.head(10))
# 价格分布
print(f"n3. 价格分布:")
price_dist = self.price_analysis()
print(price_dist)
# 相关性
print(f"n4. 相关性分析:")
correlation = self.correlation_analysis()
print(correlation)
def create_visualizations(self):
"""创建可视化图表"""
# 设置样式
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
# 创建子图
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('产品数据分析可视化', fontsize=16, fontweight='bold')
# 1. 价格分布直方图
axes[0, 0].hist(self.df['price'], bins=30, edgecolor='black', alpha=0.7)
axes[0, 0].set_title('价格分布')
axes[0, 0].set_xlabel('价格')
axes[0, 0].set_ylabel('产品数量')
axes[0, 0].grid(True, alpha=0.3)
# 2. 评分分布
rating_counts = self.df['rating'].value_counts().sort_index()
axes[0, 1].bar(rating_counts.index, rating_counts.values, alpha=0.7)
axes[0, 1].set_title('评分分布')
axes[0, 1].set_xlabel('评分')
axes[0, 1].set_ylabel('产品数量')
axes[0, 1].grid(True, alpha=0.3)
# 3. 各类别产品数量
top_categories = self.df['category'].value_counts().head(10)
axes[1, 0].barh(range(len(top_categories)), top_categories.values)
axes[1, 0].set_yticks(range(len(top_categories)))
axes[1, 0].set_yticklabels(top_categories.index)
axes[1, 0].set_title('Top 10 产品类别')
axes[1, 0].set_xlabel('产品数量')
axes[1, 0].invert_yaxis() # 从上到下显示
# 4. 价格与评分的散点图
axes[1, 1].scatter(self.df['price'], self.df['rating'], alpha=0.6)
axes[1, 1].set_title('价格 vs 评分')
axes[1, 1].set_xlabel('价格')
axes[1, 1].set_ylabel('评分')
axes[1, 1].grid(True, alpha=0.3)
# 调整布局
plt.tight_layout()
plt.savefig('product_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
# 创建额外的高级可视化
self.create_advanced_visualizations()
def create_advanced_visualizations(self):
"""创建高级可视化"""
# 价格与评论数的关系
plt.figure(figsize=(10, 6))
plt.scatter(self.df['price'], self.df['review_count'], alpha=0.6)
plt.xscale('log') # 使用对数尺度更好地显示数据
plt.yscale('log')
plt.title('价格 vs 评论数量 (对数尺度)')
plt.xlabel('价格 (对数)')
plt.ylabel('评论数量 (对数)')
plt.grid(True, alpha=0.3)
plt.savefig('price_vs_reviews.png', dpi=300, bbox_inches='tight')
plt.show()
# 类别平均价格
category_prices = self.df.groupby('category')['price'].mean().sort_values(ascending=False).head(10)
plt.figure(figsize=(12, 6))
category_prices.plot(kind='bar', color='skyblue')
plt.title('Top 10 最贵产品类别')
plt.xlabel('类别')
plt.ylabel('平均价格')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('category_prices.png', dpi=300, bbox_inches='tight')
plt.show()
# 使用示例
if __name__ == "__main__":
analyzer = ProductAnalyzer()
analyzer.generate_report()
analyzer.create_visualizations()
4. 主程序入口
主程序 (main.py)
import asyncio
from async_crawler import AsyncWebCrawler
from data_analysis import ProductAnalyzer
import argparse
def main():
parser = argparse.ArgumentParser(description='异步Web爬虫与数据分析工具')
parser.add_argument('--crawl', action='store_true', help='运行爬虫')
parser.add_argument('--analyze', action='store_true', help='运行数据分析')
parser.add_argument('--url', help='目标网站基础URL')
parser.add_argument('--categories', nargs='+', help='要爬取的类别URL列表')
args = parser.parse_args()
if args.crawl:
if not args.url or not args.categories:
print("请提供目标URL和类别URL列表")
return
crawler = AsyncWebCrawler(args.url, max_concurrent=5)
asyncio.run(crawler.run(args.categories))
if args.analyze:
analyzer = ProductAnalyzer()
analyzer.generate_report()
analyzer.create_visualizations()
if __name__ == "__main__":
main()
项目特点与优势
- 高效异步采集:使用asyncio和aiohttp实现高并发数据采集
- 健壮的错误处理:包含重试机制和异常处理,提高爬虫稳定性
- 数据持久化:支持SQLite数据库存储,确保数据不丢失
- 完整数据分析:提供从数据清洗到可视化的全流程分析
- 可扩展架构:模块化设计,易于添加新的数据源和分析方法
使用示例
# 运行爬虫
python main.py --crawl --url "https://example-store.com" --categories "https://example-store.com/category/electronics" "https://example-store.com/category/books"
# 运行数据分析
python main.py --analyze
# 同时运行爬虫和数据分析
python main.py --crawl --analyze --url "https://example-store.com" --categories "https://example-store.com/category/electronics"
注意事项
- 遵守目标网站的robots.txt协议
- 设置合理的请求间隔,避免对目标网站造成过大压力
- 使用代理IP和用户代理轮换,防止被封锁
- 注意法律和版权问题,仅采集公开可用数据
进一步优化方向
要使这个爬虫系统更加完善,可以考虑以下优化:
- 添加代理IP支持,提高爬虫的匿名性和稳定性
- 实现分布式爬虫架构,使用Redis进行任务队列管理
- 添加机器学习算法,自动识别和提取产品信息
- 集成更多数据源,实现跨平台数据对比分析
- 开发Web界面,提供交互式数据探索功能
总结
本教程详细介绍了如何使用Python构建一个完整的异步Web爬虫和数据分析系统。通过结合asyncio、aiohttp、BeautifulSoup和Pandas等库,我们实现了一个高效、稳定的数据采集和分析管道。
这个系统不仅能够高效地采集大量网络数据,还能对数据进行深入的清洗、分析和可视化,为决策提供数据支持。无论是用于市场研究、竞争分析还是价格监控,这个工具都能提供强大的数据采集和分析能力。
通过学习本教程,您将掌握Python异步编程、Web爬虫开发和数据分析的核心技能,这些技能在数据科学和Web开发领域都具有很高的实用价值。

