Python智能数据分析系统实战:从数据采集到机器学习可视化全流程 | 数据科学开发指南

2025-08-17 0 598

发布日期:2024年6月10日

一、系统架构设计

本教程将构建一个完整的数据分析系统,主要功能模块包括:

  • 分布式爬虫:Scrapy-Redis实现数据采集
  • ETL管道:Apache Airflow调度数据处理
  • 特征工程:PySpark大规模数据处理
  • 机器学习:AutoML自动模型训练
  • 可视化大屏:Flask+ECharts展示分析结果

技术栈:Python 3.9 + Scrapy + Airflow + PySpark + Flask + Sklearn

二、环境准备与项目初始化

1. 创建虚拟环境

# 创建Python虚拟环境
python -m venv data-env

# 激活环境
source data-env/bin/activate  # Linux/Mac
data-envScriptsactivate     # Windows

# 安装核心依赖
pip install scrapy scrapy-redis apache-airflow pyspark flask flask-sqlalchemy pandas sklearn

2. 项目目录结构

data-system/
├── crawler/          # 爬虫模块
│   ├── spiders/      # 爬虫实现
│   └── pipelines.py  # 数据管道
├── etl/              # 数据处理
│   ├── dags/         # Airflow工作流
│   └── operators/    # 自定义算子
├── feature/          # 特征工程
│   ├── spark/        # PySpark脚本
│   └── processor.py  # 特征处理器
├── model/            # 机器学习
│   ├── trainer.py    # 模型训练
│   └── predictor.py  # 预测服务
├── web/              # 可视化大屏
│   ├── app.py        # Flask应用
│   └── templates/    # 前端模板
└── config.py         # 全局配置

三、分布式爬虫实现

1. Scrapy-Redis配置

# crawler/settings.py
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = 'redis://localhost:6379/0'

# 分布式爬虫配置
SCHEDULER_PERSIST = True
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'

# 爬虫中间件
DOWNLOADER_MIDDLEWARES = {
    'scrapy_redis.middleware.RedisMiddleware': 400
}

# crawler/pipelines.py
class RedisPipeline:
    def process_item(self, item, spider):
        redis_conn = spider.server
        redis_conn.lpush('raw_data', json.dumps(dict(item)))
        return item

2. 电商数据爬虫示例

# crawler/spiders/product_spider.py
import scrapy
from scrapy_redis.spiders import RedisSpider

class ProductSpider(RedisSpider):
    name = 'product'
    redis_key = 'product:start_urls'

    def parse(self, response):
        for product in response.css('.product-item'):
            yield {
                'name': product.css('h3::text').get(),
                'price': float(product.css('.price::text').re_first(r'd+.d+')),
                'sales': int(product.css('.sales::text').re_first(r'd+')),
                'shop': product.css('.shop::attr(title)').get()
            }
        
        next_page = response.css('.next-page::attr(href)').get()
        if next_page:
            yield response.follow(next_page, self.parse)

四、ETL数据处理流程

1. Airflow数据管道

# etl/dags/data_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1)
}

dag = DAG(
    'product_etl',
    default_args=default_args,
    schedule_interval='@daily'
)

def extract():
    from redis import Redis
    import json
    redis = Redis.from_url('redis://localhost:6379/0')
    raw_data = [json.loads(item) for item in redis.lrange('raw_data', 0, -1)]
    return raw_data

def transform(raw_data):
    import pandas as pd
    df = pd.DataFrame(raw_data)
    # 数据清洗
    df = df.dropna()
    df['price'] = df['price'].astype(float)
    df['sales'] = df['sales'].astype(int)
    # 计算销售额
    df['sales_amount'] = df['price'] * df['sales']
    return df.to_dict('records')

def load(transformed_data):
    from sqlalchemy import create_engine
    engine = create_engine('postgresql://user:pass@localhost:5432/data_warehouse')
    pd.DataFrame(transformed_data).to_sql('product_stats', engine, if_exists='append')

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform,
    op_args=[extract_task.output],
    dag=dag
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load,
    op_args=[transform_task.output],
    dag=dag
)

extract_task >> transform_task >> load_task

2. PySpark特征工程

# feature/spark/feature_engineering.py
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col, log

spark = SparkSession.builder 
    .appName("FeatureEngineering") 
    .getOrCreate()

# 读取数据
df = spark.read 
    .format("jdbc") 
    .option("url", "jdbc:postgresql://localhost:5432/data_warehouse") 
    .option("dbtable", "product_stats") 
    .option("user", "user") 
    .option("password", "pass") 
    .load()

# 特征转换
df = df.withColumn("log_sales", log(col("sales") + 1))

# 特征组合
assembler = VectorAssembler(
    inputCols=["price", "log_sales"],
    outputCol="features"
)

feature_df = assembler.transform(df)

# 特征标准化
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

scaler_model = scaler.fit(feature_df)
scaled_df = scaler_model.transform(feature_df)

# 保存特征数据
scaled_df.write 
    .format("parquet") 
    .mode("overwrite") 
    .save("/data/features/product_features")

五、机器学习建模

1. 自动化模型训练

# model/trainer.py
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import mlflow
import mlflow.sklearn

def train_model():
    # 加载特征数据
    df = pd.read_parquet("/data/features/product_features")
    
    # 准备训练数据
    X = df[['price', 'log_sales']]
    y = df['sales_amount']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    
    # 启用MLflow跟踪
    mlflow.set_experiment("Product Sales Prediction")
    
    with mlflow.start_run():
        # 模型训练
        model = RandomForestRegressor(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)
        
        # 模型评估
        predictions = model.predict(X_test)
        rmse = mean_squared_error(y_test, predictions, squared=False)
        
        # 记录参数和指标
        mlflow.log_param("n_estimators", 100)
        mlflow.log_metric("rmse", rmse)
        
        # 保存模型
        mlflow.sklearn.log_model(model, "model")
        
        print(f"Model trained with RMSE: {rmse}")
        return model

if __name__ == "__main__":
    train_model()

2. 预测服务API

# model/predictor.py
from flask import Flask, request, jsonify
import mlflow.sklearn
import pandas as pd

app = Flask(__name__)

# 加载模型
model = mlflow.sklearn.load_model("runs://model")

@app.route('/predict', methods=['POST'])
def predict():
    data = request.json
    input_data = pd.DataFrame([{
        'price': data['price'],
        'log_sales': data.get('log_sales', 0)
    }])
    
    prediction = model.predict(input_data)[0]
    return jsonify({'prediction': prediction})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001)

六、可视化大屏开发

1. Flask数据接口

# web/app.py
from flask import Flask, render_template
from flask_sqlalchemy import SQLAlchemy
import pandas as pd

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost:5432/data_warehouse'
db = SQLAlchemy(app)

@app.route('/api/sales-trend')
def sales_trend():
    df = pd.read_sql("""
        SELECT date, SUM(sales_amount) as total_sales 
        FROM product_stats 
        GROUP BY date
        ORDER BY date
    """, db.engine)
    return df.to_json(orient='records')

@app.route('/api/top-products')
def top_products():
    df = pd.read_sql("""
        SELECT name, SUM(sales) as total_sales
        FROM product_stats
        GROUP BY name
        ORDER BY total_sales DESC
        LIMIT 10
    """, db.engine)
    return df.to_json(orient='records')

@app.route('/')
def dashboard():
    return render_template('dashboard.html')

if __name__ == '__main__':
    app.run(debug=True)

2. ECharts可视化

<!-- web/templates/dashboard.html -->
<!DOCTYPE html>
<html>
<head>
    <script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script>
</head>
<body>
    <div id="sales-trend" style="width: 800px;height:400px;"></div>
    <div id="top-products" style="width: 600px;height:400px;"></div>
    
    <script>
        // 销售趋势图
        fetch('/api/sales-trend')
            .then(res => res.json())
            .then(data => {
                const chart = echarts.init(document.getElementById('sales-trend'))
                chart.setOption({
                    title: { text: '销售趋势' },
                    xAxis: { data: data.map(item => item.date) },
                    yAxis: {},
                    series: [{
                        name: '销售额',
                        type: 'line',
                        data: data.map(item => item.total_sales)
                    }]
                })
            })
        
        // 热销商品图
        fetch('/api/top-products')
            .then(res => res.json())
            .then(data => {
                const chart = echarts.init(document.getElementById('top-products'))
                chart.setOption({
                    title: { text: '热销商品TOP10' },
                    tooltip: {},
                    xAxis: { data: data.map(item => item.name) },
                    yAxis: {},
                    series: [{
                        name: '销量',
                        type: 'bar',
                        data: data.map(item => item.total_sales)
                    }]
                })
            })
    </script>
</body>
</html>

七、系统部署与优化

1. Docker容器化部署

# Dockerfile
FROM python:3.9-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

# 爬虫服务
CMD ["scrapy", "crawl", "product"]

2. Kubernetes编排配置

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: data-system
spec:
  replicas: 3
  selector:
    matchLabels:
      app: data-system
  template:
    metadata:
      labels:
        app: data-system
    spec:
      containers:
      - name: crawler
        image: data-crawler:latest
        resources:
          limits:
            cpu: "1"
            memory: 1Gi
      - name: api
        image: data-api:latest
        ports:
        - containerPort: 5000
        resources:
          limits:
            cpu: "2"
            memory: 2Gi

八、总结与扩展

通过本教程,您已经掌握了:

  1. 分布式数据采集系统搭建
  2. 自动化ETL流程设计
  3. 大规模特征工程处理
  4. 机器学习模型开发部署
  5. 数据可视化大屏开发

扩展学习方向:

  • 实时流处理(Kafka+Flink)
  • 深度学习模型集成
  • 自动化模型监控
  • 数据质量检测体系
Python智能数据分析系统实战:从数据采集到机器学习可视化全流程 | 数据科学开发指南
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python智能数据分析系统实战:从数据采集到机器学习可视化全流程 | 数据科学开发指南 https://www.taomawang.com/server/python/865.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务