Python智能农业监测系统开发:从IoT传感器到AI预测全流程实战 | Python物联网应用

2025-08-19 0 547

发布日期:2024年10月30日

一、系统架构设计

本教程将构建一个完整的智能农业监测平台,包含以下核心模块:

  • 数据采集层:多源传感器数据整合
  • 图像处理层:OpenCV植物健康分析
  • 预测分析层:时间序列与机器学习
  • 可视化层:实时监控仪表盘
  • 控制执行层:自动化设备控制

技术栈:Python 3.10 + Flask + OpenCV + TensorFlow + MQTT

二、项目初始化与配置

1. 环境准备与依赖安装

# 创建项目目录
mkdir smart-agriculture-system
cd smart-agriculture-system

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venvScriptsactivate  # Windows

# 安装核心依赖
pip install flask flask-socketio opencv-python tensorflow scikit-learn
pip install paho-mqtt pandas numpy matplotlib
pip install pillow requests celery redis

# 项目结构
smart-agriculture-system/
├── app/
│   ├── __init__.py
│   ├── models/
│   │   ├── sensor_models.py
│   │   └── prediction_models.py
│   ├── services/
│   │   ├── data_collection.py
│   │   └── image_processing.py
│   ├── routes/
│   │   ├── api.py
│   │   └── dashboard.py
│   ├── static/
│   └── templates/
├── config.py
├── requirements.txt
└── run.py

三、传感器数据采集

1. 多传感器数据采集服务

# app/services/data_collection.py
import paho.mqtt.client as mqtt
import json
from datetime import datetime
from .database import db

class SensorDataCollector:
    def __init__(self):
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        
        # 传感器配置
        self.sensors = {
            'temperature': {'unit': '°C', 'min': -10, 'max': 50},
            'humidity': {'unit': '%', 'min': 0, 'max': 100},
            'soil_moisture': {'unit': '%', 'min': 0, 'max': 100},
            'light_intensity': {'unit': 'lux', 'min': 0, 'max': 100000}
        }

    def on_connect(self, client, userdata, flags, rc):
        print("Connected to MQTT broker")
        # 订阅所有传感器主题
        for sensor in self.sensors.keys():
            client.subscribe(f"sensors/{sensor}")

    def on_message(self, client, userdata, msg):
        try:
            topic = msg.topic
            payload = json.loads(msg.payload.decode())
            
            sensor_type = topic.split('/')[1]
            self.process_sensor_data(sensor_type, payload)
            
        except Exception as e:
            print(f"Error processing message: {e}")

    def process_sensor_data(self, sensor_type, data):
        # 数据验证
        if sensor_type not in self.sensors:
            return
        
        sensor_config = self.sensors[sensor_type]
        value = data['value']
        
        if not (sensor_config['min'] <= value <= sensor_config['max']):
            print(f"Invalid {sensor_type} value: {value}")
            return
        
        # 存储到数据库
        sensor_data = {
            'sensor_type': sensor_type,
            'value': value,
            'timestamp': datetime.now(),
            'location': data.get('location', 'field1')
        }
        
        db.sensor_data.insert_one(sensor_data)
        print(f"Stored {sensor_type} data: {value}")

    def start_collection(self):
        self.client.connect("localhost", 1883, 60)
        self.client.loop_start()

四、计算机视觉植物分析

1. OpenCV植物健康检测

# app/services/image_processing.py
import cv2
import numpy as np
from PIL import Image
import io

class PlantHealthAnalyzer:
    def __init__(self):
        self.healthy_ranges = {
            'hue': (30, 90),        # 绿色色调范围
            'saturation': (40, 255), # 饱和度范围
            'value': (30, 255)      # 亮度范围
        }

    def analyze_plant_health(self, image_data):
        # 转换图像数据
        nparr = np.frombuffer(image_data, np.uint8)
        image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        
        # 转换为HSV颜色空间
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        
        # 创建健康绿色掩码
        lower_green = np.array([self.healthy_ranges['hue'][0], 
                               self.healthy_ranges['saturation'][0], 
                               self.healthy_ranges['value'][0]])
        upper_green = np.array([self.healthy_ranges['hue'][1], 
                               self.healthy_ranges['saturation'][1], 
                               self.healthy_ranges['value'][1]])
        
        mask = cv2.inRange(hsv, lower_green, upper_green)
        
        # 计算健康区域比例
        total_pixels = image.shape[0] * image.shape[1]
        healthy_pixels = cv2.countNonZero(mask)
        health_ratio = healthy_pixels / total_pixels
        
        # 检测病害斑点
        disease_spots = self.detect_disease_spots(image, mask)
        
        return {
            'health_score': round(health_ratio * 100, 2),
            'healthy_pixels': healthy_pixels,
            'total_pixels': total_pixels,
            'disease_spots': len(disease_spots),
            'health_status': self.get_health_status(health_ratio)
        }

    def detect_disease_spots(self, image, mask):
        # 反转掩码获取非健康区域
        inverse_mask = cv2.bitwise_not(mask)
        
        # 形态学操作去除噪声
        kernel = np.ones((5,5), np.uint8)
        cleaned = cv2.morphologyEx(inverse_mask, cv2.MORPH_OPEN, kernel)
        
        # 查找轮廓
        contours, _ = cv2.findContours(cleaned, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # 过滤小斑点
        disease_spots = []
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 100:  # 最小面积阈值
                disease_spots.append(contour)
        
        return disease_spots

    def get_health_status(self, ratio):
        if ratio >= 0.7:
            return "健康"
        elif ratio >= 0.4:
            return "一般"
        else:
            return "需要关注"

五、时间序列预测模型

1. 土壤湿度预测

# app/models/prediction_models.py
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
import joblib

class SoilMoisturePredictor:
    def __init__(self):
        self.model = RandomForestRegressor(n_estimators=100, random_state=42)
        self.features = ['temperature', 'humidity', 'light_intensity', 'hour', 'day_of_week']
        
    def prepare_training_data(self, sensor_data):
        """准备训练数据"""
        df = pd.DataFrame(sensor_data)
        
        # 特征工程
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df['hour'] = df['timestamp'].dt.hour
        df['day_of_week'] = df['timestamp'].dt.dayofweek
        df['month'] = df['timestamp'].dt.month
        
        # 创建滞后特征
        for lag in [1, 2, 3, 6, 12]:  # 1,2,3,6,12小时滞后
            df[f'soil_moisture_lag_{lag}'] = df['soil_moisture'].shift(lag)
        
        # 删除包含NaN的行
        df = df.dropna()
        
        return df

    def train_model(self, training_data):
        """训练预测模型"""
        df = self.prepare_training_data(training_data)
        
        X = df[self.features + [f'soil_moisture_lag_{lag}' for lag in [1,2,3,6,12]]]
        y = df['soil_moisture']
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        self.model.fit(X_train, y_train)
        
        # 评估模型
        predictions = self.model.predict(X_test)
        mae = mean_absolute_error(y_test, predictions)
        print(f"Model MAE: {mae:.2f}")
        
        # 保存模型
        joblib.dump(self.model, 'soil_moisture_model.pkl')
        
        return mae

    def predict_next_24h(self, current_data):
        """预测未来24小时土壤湿度"""
        predictions = []
        current_state = current_data.copy()
        
        for hour in range(24):
            # 准备预测输入
            input_features = self.prepare_prediction_input(current_state)
            
            # 进行预测
            prediction = self.model.predict([input_features])[0]
            predictions.append({
                'hour': hour,
                'predicted_moisture': round(prediction, 2),
                'timestamp': current_state['timestamp'] + pd.Timedelta(hours=hour+1)
            })
            
            # 更新状态用于下一步预测
            current_state['soil_moisture'] = prediction
            current_state['timestamp'] += pd.Timedelta(hours=1)
        
        return predictions

六、Flask Web应用

1. 实时数据API

# app/routes/api.py
from flask import Blueprint, jsonify, request
from app.services.data_collection import SensorDataCollector
from app.services.image_processing import PlantHealthAnalyzer
from app.models.prediction_models import SoilMoisturePredictor

api_bp = Blueprint('api', __name__)
sensor_collector = SensorDataCollector()
plant_analyzer = PlantHealthAnalyzer()
predictor = SoilMoisturePredictor()

@api_bp.route('/api/sensor-data', methods=['GET'])
def get_sensor_data():
    """获取传感器数据"""
    hours = request.args.get('hours', 24, type=int)
    
    data = db.sensor_data.find({
        'timestamp': {'$gte': datetime.now() - timedelta(hours=hours)}
    }).sort('timestamp', -1)
    
    return jsonify({
        'sensor_data': list(data),
        'timestamp': datetime.now()
    })

@api_bp.route('/api/analyze-plant', methods=['POST'])
def analyze_plant():
    """分析植物健康"""
    if 'image' not in request.files:
        return jsonify({'error': 'No image provided'}), 400
    
    image_file = request.files['image']
    image_data = image_file.read()
    
    analysis_result = plant_analyzer.analyze_plant_health(image_data)
    
    return jsonify(analysis_result)

@api_bp.route('/api/predict-moisture', methods=['GET'])
def predict_moisture():
    """预测土壤湿度"""
    latest_data = db.sensor_data.find().sort('timestamp', -1).limit(12)
    predictions = predictor.predict_next_24h(latest_data)
    
    return jsonify({
        'predictions': predictions,
        'generated_at': datetime.now()
    })

@api_bp.route('/api/irrigation-schedule', methods=['GET'])
def get_irrigation_schedule():
    """获取灌溉计划"""
    predictions = predictor.predict_next_24h()
    
    # 基于预测结果生成灌溉计划
    irrigation_plan = []
    for pred in predictions:
        if pred['predicted_moisture'] < 30:  # 湿度低于30%需要灌溉
            irrigation_plan.append({
                'start_time': pred['timestamp'],
                'duration_minutes': 30,
                'reason': '低土壤湿度预测'
            })
    
    return jsonify({'irrigation_plan': irrigation_plan})

七、自动化控制执行

1. 智能灌溉控制器

# app/services/irrigation_controller.py
import RPi.GPIO as GPIO
import time
from datetime import datetime
import threading

class IrrigationController:
    def __init__(self):
        # GPIO引脚配置
        self.water_pump_pin = 17
        self.solenoid_valve_pin = 27
        
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.water_pump_pin, GPIO.OUT)
        GPIO.setup(self.solenoid_valve_pin, GPIO.OUT)
        
        self.is_irrigating = False
        self.irrigation_thread = None

    def start_irrigation(self, zone, duration_minutes):
        """启动灌溉"""
        if self.is_irrigating:
            return False
        
        self.is_irrigating = True
        self.irrigation_thread = threading.Thread(
            target=self._irrigate_zone,
            args=(zone, duration_minutes)
        )
        self.irrigation_thread.start()
        
        return True

    def _irrigate_zone(self, zone, duration_minutes):
        """执行灌溉操作"""
        try:
            # 打开水泵和电磁阀
            GPIO.output(self.water_pump_pin, GPIO.HIGH)
            GPIO.output(self.solenoid_valve_pin, GPIO.HIGH)
            
            # 记录灌溉开始
            irrigation_record = {
                'zone': zone,
                'start_time': datetime.now(),
                'duration_minutes': duration_minutes,
                'status': 'running'
            }
            db.irrigation_logs.insert_one(irrigation_record)
            
            # 等待灌溉完成
            time.sleep(duration_minutes * 60)
            
            # 关闭设备
            GPIO.output(self.water_pump_pin, GPIO.LOW)
            GPIO.output(self.solenoid_valve_pin, GPIO.LOW)
            
            # 更新记录
            irrigation_record['end_time'] = datetime.now()
            irrigation_record['status'] = 'completed'
            db.irrigation_logs.update_one(
                {'_id': irrigation_record['_id']},
                {'$set': irrigation_record}
            )
            
        except Exception as e:
            print(f"Irrigation error: {e}")
            # 紧急关闭
            GPIO.output(self.water_pump_pin, GPIO.LOW)
            GPIO.output(self.solenoid_valve_pin, GPIO.LOW)
            
        finally:
            self.is_irrigating = False

    def emergency_stop(self):
        """紧急停止灌溉"""
        GPIO.output(self.water_pump_pin, GPIO.LOW)
        GPIO.output(self.solenoid_valve_pin, GPIO.LOW)
        self.is_irrigating = False
        
        if self.irrigation_thread and self.irrigation_thread.is_alive():
            self.irrigation_thread.join(timeout=1.0)

    def get_status(self):
        """获取灌溉状态"""
        return {
            'is_irrigating': self.is_irrigating,
            'current_zone': self.current_zone if self.is_irrigating else None,
            'start_time': self.start_time if self.is_irrigating else None
        }

八、部署与优化

1. Docker容器化部署

# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y 
    libglib2.0-0 
    libsm6 
    libxext6 
    libxrender-dev 
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .
RUN pip install -r requirements.txt

# 复制应用代码
COPY . .

# 创建非root用户
RUN useradd -m -u 1000 farmer
USER farmer

EXPOSE 5000

CMD ["gunicorn", "run:app", "-b", "0.0.0.0:5000", "--workers", "4"]

2. 性能监控配置

# config.py
import logging
from logging.handlers import RotatingFileHandler

class Config:
    # 性能监控配置
    MONITORING_INTERVAL = 60  # 秒
    MAX_LOG_SIZE = 10 * 1024 * 1024  # 10MB
    BACKUP_COUNT = 5
    
    # 数据库配置
    MONGODB_URI = "mongodb://localhost:27017/smart_farm"
    REDIS_URL = "redis://localhost:6379/0"
    
    # MQTT配置
    MQTT_BROKER = "localhost"
    MQTT_PORT = 1883
    
    @staticmethod
    def setup_logging():
        handler = RotatingFileHandler(
            'app.log',
            maxBytes=Config.MAX_LOG_SIZE,
            backupCount=Config.BACKUP_COUNT
        )
        handler.setLevel(logging.INFO)
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        
        logger = logging.getLogger()
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)

九、总结与扩展

通过本教程,您已经掌握了:

  1. 农业物联网数据采集技术
  2. 计算机视觉植物健康分析
  3. 时间序列预测模型构建
  4. Flask Web应用开发
  5. 自动化控制系统实现

扩展学习方向:

  • 无人机农田监测集成
  • 区块链农产品溯源
  • 边缘计算设备优化
  • 多农场分布式管理
Python智能农业监测系统开发:从IoT传感器到AI预测全流程实战 | Python物联网应用
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python智能农业监测系统开发:从IoT传感器到AI预测全流程实战 | Python物联网应用 https://www.taomawang.com/server/python/908.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务