发布日期:2024年10月30日
一、系统架构设计
本教程将构建一个完整的智能农业监测平台,包含以下核心模块:
- 数据采集层:多源传感器数据整合
- 图像处理层:OpenCV植物健康分析
- 预测分析层:时间序列与机器学习
- 可视化层:实时监控仪表盘
- 控制执行层:自动化设备控制
技术栈:Python 3.10 + Flask + OpenCV + TensorFlow + MQTT
二、项目初始化与配置
1. 环境准备与依赖安装
# 创建项目目录
mkdir smart-agriculture-system
cd smart-agriculture-system
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# venvScriptsactivate # Windows
# 安装核心依赖
pip install flask flask-socketio opencv-python tensorflow scikit-learn
pip install paho-mqtt pandas numpy matplotlib
pip install pillow requests celery redis
# 项目结构
smart-agriculture-system/
├── app/
│ ├── __init__.py
│ ├── models/
│ │ ├── sensor_models.py
│ │ └── prediction_models.py
│ ├── services/
│ │ ├── data_collection.py
│ │ └── image_processing.py
│ ├── routes/
│ │ ├── api.py
│ │ └── dashboard.py
│ ├── static/
│ └── templates/
├── config.py
├── requirements.txt
└── run.py
三、传感器数据采集
1. 多传感器数据采集服务
# app/services/data_collection.py
import paho.mqtt.client as mqtt
import json
from datetime import datetime
from .database import db
class SensorDataCollector:
def __init__(self):
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
# 传感器配置
self.sensors = {
'temperature': {'unit': '°C', 'min': -10, 'max': 50},
'humidity': {'unit': '%', 'min': 0, 'max': 100},
'soil_moisture': {'unit': '%', 'min': 0, 'max': 100},
'light_intensity': {'unit': 'lux', 'min': 0, 'max': 100000}
}
def on_connect(self, client, userdata, flags, rc):
print("Connected to MQTT broker")
# 订阅所有传感器主题
for sensor in self.sensors.keys():
client.subscribe(f"sensors/{sensor}")
def on_message(self, client, userdata, msg):
try:
topic = msg.topic
payload = json.loads(msg.payload.decode())
sensor_type = topic.split('/')[1]
self.process_sensor_data(sensor_type, payload)
except Exception as e:
print(f"Error processing message: {e}")
def process_sensor_data(self, sensor_type, data):
# 数据验证
if sensor_type not in self.sensors:
return
sensor_config = self.sensors[sensor_type]
value = data['value']
if not (sensor_config['min'] <= value <= sensor_config['max']):
print(f"Invalid {sensor_type} value: {value}")
return
# 存储到数据库
sensor_data = {
'sensor_type': sensor_type,
'value': value,
'timestamp': datetime.now(),
'location': data.get('location', 'field1')
}
db.sensor_data.insert_one(sensor_data)
print(f"Stored {sensor_type} data: {value}")
def start_collection(self):
self.client.connect("localhost", 1883, 60)
self.client.loop_start()
四、计算机视觉植物分析
1. OpenCV植物健康检测
# app/services/image_processing.py
import cv2
import numpy as np
from PIL import Image
import io
class PlantHealthAnalyzer:
def __init__(self):
self.healthy_ranges = {
'hue': (30, 90), # 绿色色调范围
'saturation': (40, 255), # 饱和度范围
'value': (30, 255) # 亮度范围
}
def analyze_plant_health(self, image_data):
# 转换图像数据
nparr = np.frombuffer(image_data, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# 转换为HSV颜色空间
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 创建健康绿色掩码
lower_green = np.array([self.healthy_ranges['hue'][0],
self.healthy_ranges['saturation'][0],
self.healthy_ranges['value'][0]])
upper_green = np.array([self.healthy_ranges['hue'][1],
self.healthy_ranges['saturation'][1],
self.healthy_ranges['value'][1]])
mask = cv2.inRange(hsv, lower_green, upper_green)
# 计算健康区域比例
total_pixels = image.shape[0] * image.shape[1]
healthy_pixels = cv2.countNonZero(mask)
health_ratio = healthy_pixels / total_pixels
# 检测病害斑点
disease_spots = self.detect_disease_spots(image, mask)
return {
'health_score': round(health_ratio * 100, 2),
'healthy_pixels': healthy_pixels,
'total_pixels': total_pixels,
'disease_spots': len(disease_spots),
'health_status': self.get_health_status(health_ratio)
}
def detect_disease_spots(self, image, mask):
# 反转掩码获取非健康区域
inverse_mask = cv2.bitwise_not(mask)
# 形态学操作去除噪声
kernel = np.ones((5,5), np.uint8)
cleaned = cv2.morphologyEx(inverse_mask, cv2.MORPH_OPEN, kernel)
# 查找轮廓
contours, _ = cv2.findContours(cleaned, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 过滤小斑点
disease_spots = []
for contour in contours:
area = cv2.contourArea(contour)
if area > 100: # 最小面积阈值
disease_spots.append(contour)
return disease_spots
def get_health_status(self, ratio):
if ratio >= 0.7:
return "健康"
elif ratio >= 0.4:
return "一般"
else:
return "需要关注"
五、时间序列预测模型
1. 土壤湿度预测
# app/models/prediction_models.py
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
import joblib
class SoilMoisturePredictor:
def __init__(self):
self.model = RandomForestRegressor(n_estimators=100, random_state=42)
self.features = ['temperature', 'humidity', 'light_intensity', 'hour', 'day_of_week']
def prepare_training_data(self, sensor_data):
"""准备训练数据"""
df = pd.DataFrame(sensor_data)
# 特征工程
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['month'] = df['timestamp'].dt.month
# 创建滞后特征
for lag in [1, 2, 3, 6, 12]: # 1,2,3,6,12小时滞后
df[f'soil_moisture_lag_{lag}'] = df['soil_moisture'].shift(lag)
# 删除包含NaN的行
df = df.dropna()
return df
def train_model(self, training_data):
"""训练预测模型"""
df = self.prepare_training_data(training_data)
X = df[self.features + [f'soil_moisture_lag_{lag}' for lag in [1,2,3,6,12]]]
y = df['soil_moisture']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
self.model.fit(X_train, y_train)
# 评估模型
predictions = self.model.predict(X_test)
mae = mean_absolute_error(y_test, predictions)
print(f"Model MAE: {mae:.2f}")
# 保存模型
joblib.dump(self.model, 'soil_moisture_model.pkl')
return mae
def predict_next_24h(self, current_data):
"""预测未来24小时土壤湿度"""
predictions = []
current_state = current_data.copy()
for hour in range(24):
# 准备预测输入
input_features = self.prepare_prediction_input(current_state)
# 进行预测
prediction = self.model.predict([input_features])[0]
predictions.append({
'hour': hour,
'predicted_moisture': round(prediction, 2),
'timestamp': current_state['timestamp'] + pd.Timedelta(hours=hour+1)
})
# 更新状态用于下一步预测
current_state['soil_moisture'] = prediction
current_state['timestamp'] += pd.Timedelta(hours=1)
return predictions
六、Flask Web应用
1. 实时数据API
# app/routes/api.py
from flask import Blueprint, jsonify, request
from app.services.data_collection import SensorDataCollector
from app.services.image_processing import PlantHealthAnalyzer
from app.models.prediction_models import SoilMoisturePredictor
api_bp = Blueprint('api', __name__)
sensor_collector = SensorDataCollector()
plant_analyzer = PlantHealthAnalyzer()
predictor = SoilMoisturePredictor()
@api_bp.route('/api/sensor-data', methods=['GET'])
def get_sensor_data():
"""获取传感器数据"""
hours = request.args.get('hours', 24, type=int)
data = db.sensor_data.find({
'timestamp': {'$gte': datetime.now() - timedelta(hours=hours)}
}).sort('timestamp', -1)
return jsonify({
'sensor_data': list(data),
'timestamp': datetime.now()
})
@api_bp.route('/api/analyze-plant', methods=['POST'])
def analyze_plant():
"""分析植物健康"""
if 'image' not in request.files:
return jsonify({'error': 'No image provided'}), 400
image_file = request.files['image']
image_data = image_file.read()
analysis_result = plant_analyzer.analyze_plant_health(image_data)
return jsonify(analysis_result)
@api_bp.route('/api/predict-moisture', methods=['GET'])
def predict_moisture():
"""预测土壤湿度"""
latest_data = db.sensor_data.find().sort('timestamp', -1).limit(12)
predictions = predictor.predict_next_24h(latest_data)
return jsonify({
'predictions': predictions,
'generated_at': datetime.now()
})
@api_bp.route('/api/irrigation-schedule', methods=['GET'])
def get_irrigation_schedule():
"""获取灌溉计划"""
predictions = predictor.predict_next_24h()
# 基于预测结果生成灌溉计划
irrigation_plan = []
for pred in predictions:
if pred['predicted_moisture'] < 30: # 湿度低于30%需要灌溉
irrigation_plan.append({
'start_time': pred['timestamp'],
'duration_minutes': 30,
'reason': '低土壤湿度预测'
})
return jsonify({'irrigation_plan': irrigation_plan})
七、自动化控制执行
1. 智能灌溉控制器
# app/services/irrigation_controller.py
import RPi.GPIO as GPIO
import time
from datetime import datetime
import threading
class IrrigationController:
def __init__(self):
# GPIO引脚配置
self.water_pump_pin = 17
self.solenoid_valve_pin = 27
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.water_pump_pin, GPIO.OUT)
GPIO.setup(self.solenoid_valve_pin, GPIO.OUT)
self.is_irrigating = False
self.irrigation_thread = None
def start_irrigation(self, zone, duration_minutes):
"""启动灌溉"""
if self.is_irrigating:
return False
self.is_irrigating = True
self.irrigation_thread = threading.Thread(
target=self._irrigate_zone,
args=(zone, duration_minutes)
)
self.irrigation_thread.start()
return True
def _irrigate_zone(self, zone, duration_minutes):
"""执行灌溉操作"""
try:
# 打开水泵和电磁阀
GPIO.output(self.water_pump_pin, GPIO.HIGH)
GPIO.output(self.solenoid_valve_pin, GPIO.HIGH)
# 记录灌溉开始
irrigation_record = {
'zone': zone,
'start_time': datetime.now(),
'duration_minutes': duration_minutes,
'status': 'running'
}
db.irrigation_logs.insert_one(irrigation_record)
# 等待灌溉完成
time.sleep(duration_minutes * 60)
# 关闭设备
GPIO.output(self.water_pump_pin, GPIO.LOW)
GPIO.output(self.solenoid_valve_pin, GPIO.LOW)
# 更新记录
irrigation_record['end_time'] = datetime.now()
irrigation_record['status'] = 'completed'
db.irrigation_logs.update_one(
{'_id': irrigation_record['_id']},
{'$set': irrigation_record}
)
except Exception as e:
print(f"Irrigation error: {e}")
# 紧急关闭
GPIO.output(self.water_pump_pin, GPIO.LOW)
GPIO.output(self.solenoid_valve_pin, GPIO.LOW)
finally:
self.is_irrigating = False
def emergency_stop(self):
"""紧急停止灌溉"""
GPIO.output(self.water_pump_pin, GPIO.LOW)
GPIO.output(self.solenoid_valve_pin, GPIO.LOW)
self.is_irrigating = False
if self.irrigation_thread and self.irrigation_thread.is_alive():
self.irrigation_thread.join(timeout=1.0)
def get_status(self):
"""获取灌溉状态"""
return {
'is_irrigating': self.is_irrigating,
'current_zone': self.current_zone if self.is_irrigating else None,
'start_time': self.start_time if self.is_irrigating else None
}
八、部署与优化
1. Docker容器化部署
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y
libglib2.0-0
libsm6
libxext6
libxrender-dev
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd -m -u 1000 farmer
USER farmer
EXPOSE 5000
CMD ["gunicorn", "run:app", "-b", "0.0.0.0:5000", "--workers", "4"]
2. 性能监控配置
# config.py
import logging
from logging.handlers import RotatingFileHandler
class Config:
# 性能监控配置
MONITORING_INTERVAL = 60 # 秒
MAX_LOG_SIZE = 10 * 1024 * 1024 # 10MB
BACKUP_COUNT = 5
# 数据库配置
MONGODB_URI = "mongodb://localhost:27017/smart_farm"
REDIS_URL = "redis://localhost:6379/0"
# MQTT配置
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
@staticmethod
def setup_logging():
handler = RotatingFileHandler(
'app.log',
maxBytes=Config.MAX_LOG_SIZE,
backupCount=Config.BACKUP_COUNT
)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
九、总结与扩展
通过本教程,您已经掌握了:
- 农业物联网数据采集技术
- 计算机视觉植物健康分析
- 时间序列预测模型构建
- Flask Web应用开发
- 自动化控制系统实现
扩展学习方向:
- 无人机农田监测集成
- 区块链农产品溯源
- 边缘计算设备优化
- 多农场分布式管理