发布日期:2024年6月10日
一、系统架构设计
本教程将构建一个完整的数据分析系统,主要功能模块包括:
- 分布式爬虫:Scrapy-Redis实现数据采集
- ETL管道:Apache Airflow调度数据处理
- 特征工程:PySpark大规模数据处理
- 机器学习:AutoML自动模型训练
- 可视化大屏:Flask+ECharts展示分析结果
技术栈:Python 3.9 + Scrapy + Airflow + PySpark + Flask + Sklearn
二、环境准备与项目初始化
1. 创建虚拟环境
# 创建Python虚拟环境
python -m venv data-env
# 激活环境
source data-env/bin/activate # Linux/Mac
data-envScriptsactivate # Windows
# 安装核心依赖
pip install scrapy scrapy-redis apache-airflow pyspark flask flask-sqlalchemy pandas sklearn
2. 项目目录结构
data-system/
├── crawler/ # 爬虫模块
│ ├── spiders/ # 爬虫实现
│ └── pipelines.py # 数据管道
├── etl/ # 数据处理
│ ├── dags/ # Airflow工作流
│ └── operators/ # 自定义算子
├── feature/ # 特征工程
│ ├── spark/ # PySpark脚本
│ └── processor.py # 特征处理器
├── model/ # 机器学习
│ ├── trainer.py # 模型训练
│ └── predictor.py # 预测服务
├── web/ # 可视化大屏
│ ├── app.py # Flask应用
│ └── templates/ # 前端模板
└── config.py # 全局配置
三、分布式爬虫实现
1. Scrapy-Redis配置
# crawler/settings.py
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = 'redis://localhost:6379/0'
# 分布式爬虫配置
SCHEDULER_PERSIST = True
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
# 爬虫中间件
DOWNLOADER_MIDDLEWARES = {
'scrapy_redis.middleware.RedisMiddleware': 400
}
# crawler/pipelines.py
class RedisPipeline:
def process_item(self, item, spider):
redis_conn = spider.server
redis_conn.lpush('raw_data', json.dumps(dict(item)))
return item
2. 电商数据爬虫示例
# crawler/spiders/product_spider.py
import scrapy
from scrapy_redis.spiders import RedisSpider
class ProductSpider(RedisSpider):
name = 'product'
redis_key = 'product:start_urls'
def parse(self, response):
for product in response.css('.product-item'):
yield {
'name': product.css('h3::text').get(),
'price': float(product.css('.price::text').re_first(r'd+.d+')),
'sales': int(product.css('.sales::text').re_first(r'd+')),
'shop': product.css('.shop::attr(title)').get()
}
next_page = response.css('.next-page::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)
四、ETL数据处理流程
1. Airflow数据管道
# etl/dags/data_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1)
}
dag = DAG(
'product_etl',
default_args=default_args,
schedule_interval='@daily'
)
def extract():
from redis import Redis
import json
redis = Redis.from_url('redis://localhost:6379/0')
raw_data = [json.loads(item) for item in redis.lrange('raw_data', 0, -1)]
return raw_data
def transform(raw_data):
import pandas as pd
df = pd.DataFrame(raw_data)
# 数据清洗
df = df.dropna()
df['price'] = df['price'].astype(float)
df['sales'] = df['sales'].astype(int)
# 计算销售额
df['sales_amount'] = df['price'] * df['sales']
return df.to_dict('records')
def load(transformed_data):
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@localhost:5432/data_warehouse')
pd.DataFrame(transformed_data).to_sql('product_stats', engine, if_exists='append')
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
op_args=[extract_task.output],
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
op_args=[transform_task.output],
dag=dag
)
extract_task >> transform_task >> load_task
2. PySpark特征工程
# feature/spark/feature_engineering.py
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col, log
spark = SparkSession.builder
.appName("FeatureEngineering")
.getOrCreate()
# 读取数据
df = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/data_warehouse")
.option("dbtable", "product_stats")
.option("user", "user")
.option("password", "pass")
.load()
# 特征转换
df = df.withColumn("log_sales", log(col("sales") + 1))
# 特征组合
assembler = VectorAssembler(
inputCols=["price", "log_sales"],
outputCol="features"
)
feature_df = assembler.transform(df)
# 特征标准化
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True
)
scaler_model = scaler.fit(feature_df)
scaled_df = scaler_model.transform(feature_df)
# 保存特征数据
scaled_df.write
.format("parquet")
.mode("overwrite")
.save("/data/features/product_features")
五、机器学习建模
1. 自动化模型训练
# model/trainer.py
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import mlflow
import mlflow.sklearn
def train_model():
# 加载特征数据
df = pd.read_parquet("/data/features/product_features")
# 准备训练数据
X = df[['price', 'log_sales']]
y = df['sales_amount']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# 启用MLflow跟踪
mlflow.set_experiment("Product Sales Prediction")
with mlflow.start_run():
# 模型训练
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 模型评估
predictions = model.predict(X_test)
rmse = mean_squared_error(y_test, predictions, squared=False)
# 记录参数和指标
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("rmse", rmse)
# 保存模型
mlflow.sklearn.log_model(model, "model")
print(f"Model trained with RMSE: {rmse}")
return model
if __name__ == "__main__":
train_model()
2. 预测服务API
# model/predictor.py
from flask import Flask, request, jsonify
import mlflow.sklearn
import pandas as pd
app = Flask(__name__)
# 加载模型
model = mlflow.sklearn.load_model("runs://model")
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
input_data = pd.DataFrame([{
'price': data['price'],
'log_sales': data.get('log_sales', 0)
}])
prediction = model.predict(input_data)[0]
return jsonify({'prediction': prediction})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)
六、可视化大屏开发
1. Flask数据接口
# web/app.py
from flask import Flask, render_template
from flask_sqlalchemy import SQLAlchemy
import pandas as pd
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost:5432/data_warehouse'
db = SQLAlchemy(app)
@app.route('/api/sales-trend')
def sales_trend():
df = pd.read_sql("""
SELECT date, SUM(sales_amount) as total_sales
FROM product_stats
GROUP BY date
ORDER BY date
""", db.engine)
return df.to_json(orient='records')
@app.route('/api/top-products')
def top_products():
df = pd.read_sql("""
SELECT name, SUM(sales) as total_sales
FROM product_stats
GROUP BY name
ORDER BY total_sales DESC
LIMIT 10
""", db.engine)
return df.to_json(orient='records')
@app.route('/')
def dashboard():
return render_template('dashboard.html')
if __name__ == '__main__':
app.run(debug=True)
2. ECharts可视化
<!-- web/templates/dashboard.html -->
<!DOCTYPE html>
<html>
<head>
<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script>
</head>
<body>
<div id="sales-trend" style="width: 800px;height:400px;"></div>
<div id="top-products" style="width: 600px;height:400px;"></div>
<script>
// 销售趋势图
fetch('/api/sales-trend')
.then(res => res.json())
.then(data => {
const chart = echarts.init(document.getElementById('sales-trend'))
chart.setOption({
title: { text: '销售趋势' },
xAxis: { data: data.map(item => item.date) },
yAxis: {},
series: [{
name: '销售额',
type: 'line',
data: data.map(item => item.total_sales)
}]
})
})
// 热销商品图
fetch('/api/top-products')
.then(res => res.json())
.then(data => {
const chart = echarts.init(document.getElementById('top-products'))
chart.setOption({
title: { text: '热销商品TOP10' },
tooltip: {},
xAxis: { data: data.map(item => item.name) },
yAxis: {},
series: [{
name: '销量',
type: 'bar',
data: data.map(item => item.total_sales)
}]
})
})
</script>
</body>
</html>
七、系统部署与优化
1. Docker容器化部署
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# 爬虫服务
CMD ["scrapy", "crawl", "product"]
2. Kubernetes编排配置
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: data-system
spec:
replicas: 3
selector:
matchLabels:
app: data-system
template:
metadata:
labels:
app: data-system
spec:
containers:
- name: crawler
image: data-crawler:latest
resources:
limits:
cpu: "1"
memory: 1Gi
- name: api
image: data-api:latest
ports:
- containerPort: 5000
resources:
limits:
cpu: "2"
memory: 2Gi
八、总结与扩展
通过本教程,您已经掌握了:
- 分布式数据采集系统搭建
- 自动化ETL流程设计
- 大规模特征工程处理
- 机器学习模型开发部署
- 数据可视化大屏开发
扩展学习方向:
- 实时流处理(Kafka+Flink)
- 深度学习模型集成
- 自动化模型监控
- 数据质量检测体系