Python机器学习实战:构建智能推荐系统与协同过滤算法 | AI技术应用

2025-10-09 0 996

推荐系统在现代应用中的核心价值

随着大数据时代的到来,个性化推荐已成为电商、内容平台和社交网络的核心功能。Python凭借其丰富的机器学习库生态系统,成为构建推荐系统的首选语言。

推荐系统基础架构

import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import TruncatedSVD
import matplotlib.pyplot as plt

class RecommendationSystem:
    def __init__(self, data_source):
        self.data_source = data_source
        self.user_item_matrix = None
        self.item_similarity = None
        self.user_similarity = None
        self.model = None
        
    def load_and_preprocess_data(self):
        """加载和预处理用户-物品交互数据"""
        # 模拟数据加载
        np.random.seed(42)
        n_users = 1000
        n_items = 500
        
        # 生成用户-物品评分矩阵
        self.user_ids = np.arange(n_users)
        self.item_ids = np.arange(n_items)
        
        # 创建稀疏评分矩阵(5%的密度)
        ratings = np.zeros((n_users, n_items))
        n_ratings = int(n_users * n_items * 0.05)
        
        for _ in range(n_ratings):
            user = np.random.randint(0, n_users)
            item = np.random.randint(0, n_items)
            rating = np.random.randint(1, 6)  # 1-5分评分
            ratings[user, item] = rating
        
        self.user_item_matrix = csr_matrix(ratings)
        print(f"数据加载完成: {n_users} 用户, {n_items} 物品, {n_ratings} 个评分")

基于用户的协同过滤算法

协同过滤是推荐系统的经典算法,通过寻找相似用户或物品来产生推荐。

用户相似度计算与推荐生成

class UserBasedCF(RecommendationSystem):
    def __init__(self, data_source, k_similar_users=20):
        super().__init__(data_source)
        self.k_similar_users = k_similar_users
        
    def compute_user_similarity(self):
        """计算用户之间的余弦相似度"""
        print("开始计算用户相似度...")
        
        # 使用余弦相似度
        self.user_similarity = cosine_similarity(self.user_item_matrix)
        
        # 将对角线设为0(排除自身相似度)
        np.fill_diagonal(self.user_similarity, 0)
        print("用户相似度计算完成")
        
    def predict_rating(self, user_id, item_id):
        """预测用户对物品的评分"""
        if self.user_similarity is None:
            self.compute_user_similarity()
            
        # 获取目标用户的相似用户
        user_similarities = self.user_similarity[user_id]
        
        # 找到对目标物品有评分的相似用户
        item_ratings = self.user_item_matrix[:, item_id].toarray().flatten()
        rated_users = np.where(item_ratings > 0)[0]
        
        if len(rated_users) == 0:
            return 0
            
        # 计算加权平均评分
        similarities = user_similarities[rated_users]
        ratings = item_ratings[rated_users]
        
        # 选择最相似的k个用户
        if len(rated_users) > self.k_similar_users:
            top_indices = np.argsort(similarities)[-self.k_similar_users:]
            similarities = similarities[top_indices]
            ratings = ratings[top_indices]
            
        if np.sum(np.abs(similarities)) == 0:
            return np.mean(ratings) if len(ratings) > 0 else 0
            
        predicted_rating = np.dot(similarities, ratings) / np.sum(np.abs(similarities))
        return predicted_rating
        
    def recommend_for_user(self, user_id, n_recommendations=10):
        """为用户生成推荐"""
        user_ratings = self.user_item_matrix[user_id].toarray().flatten()
        unrated_items = np.where(user_ratings == 0)[0]
        
        predictions = []
        for item_id in unrated_items:
            predicted_rating = self.predict_rating(user_id, item_id)
            if predicted_rating > 0:
                predictions.append((item_id, predicted_rating))
                
        # 按预测评分排序
        predictions.sort(key=lambda x: x[1], reverse=True)
        return predictions[:n_recommendations]

基于矩阵分解的隐语义模型

矩阵分解技术能够发现用户和物品的潜在特征,有效解决数据稀疏性问题。

SVD矩阵分解实现

class MatrixFactorizationRecommender(RecommendationSystem):
    def __init__(self, data_source, n_factors=50, learning_rate=0.01, regularization=0.02):
        super().__init__(data_source)
        self.n_factors = n_factors
        self.learning_rate = learning_rate
        self.regularization = regularization
        self.user_factors = None
        self.item_factors = None
        
    def fit(self, n_iterations=100):
        """使用梯度下降训练矩阵分解模型"""
        n_users, n_items = self.user_item_matrix.shape
        
        # 初始化用户和物品特征矩阵
        self.user_factors = np.random.normal(0, 0.1, (n_users, self.n_factors))
        self.item_factors = np.random.normal(0, 0.1, (n_items, self.n_factors))
        
        # 获取非零评分的位置
        users, items = self.user_item_matrix.nonzero()
        ratings = self.user_item_matrix.data
        
        print(f"开始训练矩阵分解模型,迭代次数: {n_iterations}")
        
        for iteration in range(n_iterations):
            total_error = 0
            for idx in range(len(ratings)):
                user = users[idx]
                item = items[idx]
                true_rating = ratings[idx]
                
                # 预测评分
                predicted_rating = np.dot(self.user_factors[user], self.item_factors[item])
                
                # 计算误差
                error = true_rating - predicted_rating
                total_error += error ** 2
                
                # 更新特征向量
                user_grad = -2 * error * self.item_factors[item] + 2 * self.regularization * self.user_factors[user]
                item_grad = -2 * error * self.user_factors[user] + 2 * self.regularization * self.item_factors[item]
                
                self.user_factors[user] -= self.learning_rate * user_grad
                self.item_factors[item] -= self.learning_rate * item_grad
                
            if iteration % 10 == 0:
                rmse = np.sqrt(total_error / len(ratings))
                print(f"迭代 {iteration}, RMSE: {rmse:.4f}")
                
    def predict(self, user_id, item_id):
        """预测用户对物品的评分"""
        if self.user_factors is None or self.item_factors is None:
            raise ValueError("模型尚未训练")
            
        return np.dot(self.user_factors[user_id], self.item_factors[item_id])
        
    def recommend(self, user_id, n_recommendations=10):
        """生成推荐"""
        user_ratings = self.user_item_matrix[user_id].toarray().flatten()
        unrated_items = np.where(user_ratings == 0)[0]
        
        predictions = []
        for item_id in unrated_items:
            predicted_rating = self.predict(user_id, item_id)
            predictions.append((item_id, predicted_rating))
            
        predictions.sort(key=lambda x: x[1], reverse=True)
        return predictions[:n_recommendations]

混合推荐系统架构

结合多种推荐算法,构建更强大、更稳定的混合推荐系统。

加权混合推荐引擎

class HybridRecommender:
    def __init__(self, data_source):
        self.data_source = data_source
        self.user_based_cf = UserBasedCF(data_source)
        self.matrix_factorization = MatrixFactorizationRecommender(data_source)
        self.weights = {'user_cf': 0.4, 'mf': 0.6}
        
    def load_data(self):
        """加载数据到所有子模型"""
        self.user_based_cf.load_and_preprocess_data()
        self.matrix_factorization.load_and_preprocess_data()
        
    def train_models(self):
        """训练所有子模型"""
        print("训练基于用户的协同过滤模型...")
        self.user_based_cf.compute_user_similarity()
        
        print("训练矩阵分解模型...")
        self.matrix_factorization.fit(n_iterations=50)
        
    def hybrid_predict(self, user_id, item_id):
        """混合预测评分"""
        try:
            cf_pred = self.user_based_cf.predict_rating(user_id, item_id)
            mf_pred = self.matrix_factorization.predict(user_id, item_id)
            
            # 归一化处理
            cf_pred_norm = max(0, min(cf_pred, 5))  # 限制在0-5范围内
            mf_pred_norm = max(0, min(mf_pred, 5))
            
            hybrid_pred = (self.weights['user_cf'] * cf_pred_norm + 
                         self.weights['mf'] * mf_pred_norm)
            
            return hybrid_pred
            
        except Exception as e:
            print(f"预测失败: {e}")
            return 0
            
    def ensemble_recommend(self, user_id, n_recommendations=10):
        """集成推荐"""
        user_ratings = self.user_based_cf.user_item_matrix[user_id].toarray().flatten()
        unrated_items = np.where(user_ratings == 0)[0]
        
        predictions = []
        for item_id in unrated_items:
            predicted_rating = self.hybrid_predict(user_id, item_id)
            if predicted_rating > 0:
                predictions.append((item_id, predicted_rating))
                
        predictions.sort(key=lambda x: x[1], reverse=True)
        return predictions[:n_recommendations]
        
    def optimize_weights(self, test_users, true_ratings):
        """优化模型权重"""
        best_weights = None
        best_score = float('inf')
        
        # 网格搜索寻找最优权重
        for cf_weight in np.arange(0.1, 1.0, 0.1):
            mf_weight = 1.0 - cf_weight
            self.weights = {'user_cf': cf_weight, 'mf': mf_weight}
            
            total_error = 0
            count = 0
            
            for user_id, item_id, true_rating in true_ratings:
                pred_rating = self.hybrid_predict(user_id, item_id)
                error = (pred_rating - true_rating) ** 2
                total_error += error
                count += 1
                
            rmse = np.sqrt(total_error / count)
            
            if rmse < best_score:
                best_score = rmse
                best_weights = {'user_cf': cf_weight, 'mf': mf_weight}
                
        self.weights = best_weights
        print(f"最优权重: {best_weights}, RMSE: {best_score:.4f}")

评估与性能优化

建立完整的评估体系,确保推荐系统的效果和性能。

多维度评估框架

class RecommendationEvaluator:
    def __init__(self, recommender, test_data):
        self.recommender = recommender
        self.test_data = test_data
        
    def calculate_rmse(self):
        """计算均方根误差"""
        total_error = 0
        count = 0
        
        for user_id, item_id, true_rating in self.test_data:
            pred_rating = self.recommender.hybrid_predict(user_id, item_id)
            error = (pred_rating - true_rating) ** 2
            total_error += error
            count += 1
            
        return np.sqrt(total_error / count) if count > 0 else float('inf')
        
    def calculate_precision_at_k(self, k=10):
        """计算Precision@K"""
        total_precision = 0
        user_count = 0
        
        for user_id in range(self.recommender.user_based_cf.user_item_matrix.shape[0]):
            # 获取真实喜欢的物品(评分>=4)
            user_ratings = self.recommender.user_based_cf.user_item_matrix[user_id].toarray().flatten()
            true_positives = set(np.where(user_ratings >= 4)[0])
            
            if len(true_positives) == 0:
                continue
                
            # 获取推荐物品
            recommendations = self.recommender.ensemble_recommend(user_id, k)
            recommended_items = set([rec[0] for rec in recommendations])
            
            # 计算precision
            if len(recommended_items) > 0:
                precision = len(true_positives & recommended_items) / len(recommended_items)
                total_precision += precision
                user_count += 1
                
        return total_precision / user_count if user_count > 0 else 0
        
    def calculate_recall_at_k(self, k=10):
        """计算Recall@K"""
        total_recall = 0
        user_count = 0
        
        for user_id in range(self.recommender.user_based_cf.user_item_matrix.shape[0]):
            user_ratings = self.recommender.user_based_cf.user_item_matrix[user_id].toarray().flatten()
            true_positives = set(np.where(user_ratings >= 4)[0])
            
            if len(true_positives) == 0:
                continue
                
            recommendations = self.recommender.ensemble_recommend(user_id, k)
            recommended_items = set([rec[0] for rec in recommendations])
            
            recall = len(true_positives & recommended_items) / len(true_positives)
            total_recall += recall
            user_count += 1
            
        return total_recall / user_count if user_count > 0 else 0
        
    def comprehensive_evaluation(self):
        """综合评估"""
        rmse = self.calculate_rmse()
        precision_10 = self.calculate_precision_at_k(10)
        recall_10 = self.calculate_recall_at_k(10)
        f1_10 = 2 * (precision_10 * recall_10) / (precision_10 + recall_10) if (precision_10 + recall_10) > 0 else 0
        
        print("=== 推荐系统评估结果 ===")
        print(f"RMSE: {rmse:.4f}")
        print(f"Precision@10: {precision_10:.4f}")
        print(f"Recall@10: {recall_10:.4f}")
        print(f"F1-Score@10: {f1_10:.4f}")
        
        return {
            'rmse': rmse,
            'precision@10': precision_10,
            'recall@10': recall_10,
            'f1@10': f1_10
        }

实时推荐与在线学习

实现能够适应数据变化的在线学习推荐系统。

增量学习推荐器

class OnlineRecommender(HybridRecommender):
    def __init__(self, data_source, update_frequency=1000):
        super().__init__(data_source)
        self.update_frequency = update_frequency
        self.interaction_count = 0
        self.recent_interactions = []
        
    def record_interaction(self, user_id, item_id, rating):
        """记录用户交互"""
        self.recent_interactions.append((user_id, item_id, rating))
        self.interaction_count += 1
        
        # 定期更新模型
        if self.interaction_count % self.update_frequency == 0:
            self.incremental_update()
            
    def incremental_update(self):
        """增量更新模型"""
        print(f"开始增量更新,处理 {len(self.recent_interactions)} 个新交互")
        
        if len(self.recent_interactions) == 0:
            return
            
        # 更新用户-物品矩阵
        for user_id, item_id, rating in self.recent_interactions:
            if user_id < self.user_based_cf.user_item_matrix.shape[0] and 
               item_id < self.user_based_cf.user_item_matrix.shape[1]:
                self.user_based_cf.user_item_matrix[user_id, item_id] = rating
                
        # 增量更新用户相似度(只更新受影响的行)
        self.update_user_similarity()
        
        # 增量更新矩阵分解模型
        self.incremental_mf_update()
        
        # 清空记录
        self.recent_interactions = []
        print("增量更新完成")
        
    def update_user_similarity(self):
        """增量更新用户相似度矩阵"""
        # 简化实现:重新计算受影响用户的相似度
        affected_users = set([interaction[0] for interaction in self.recent_interactions])
        
        for user_id in affected_users:
            if user_id < self.user_based_cf.user_similarity.shape[0]:
                # 重新计算该用户与其他用户的相似度
                user_vector = self.user_based_cf.user_item_matrix[user_id].toarray().flatten()
                
                for other_user in range(self.user_based_cf.user_similarity.shape[0]):
                    if other_user != user_id:
                        other_vector = self.user_based_cf.user_item_matrix[other_user].toarray().flatten()
                        similarity = cosine_similarity([user_vector], [other_vector])[0][0]
                        self.user_based_cf.user_similarity[user_id, other_user] = similarity
                        self.user_based_cf.user_similarity[other_user, user_id] = similarity
        
    def incremental_mf_update(self, learning_rate=0.001):
        """增量更新矩阵分解模型"""
        for user_id, item_id, rating in self.recent_interactions:
            if (user_id < self.matrix_factorization.user_factors.shape[0] and 
                item_id < self.matrix_factorization.item_factors.shape[0]):
                
                # 单样本梯度下降
                predicted = np.dot(self.matrix_factorization.user_factors[user_id], 
                                 self.matrix_factorization.item_factors[item_id])
                error = rating - predicted
                
                # 更新特征向量
                user_grad = error * self.matrix_factorization.item_factors[item_id] - 
                          self.matrix_factorization.regularization * self.matrix_factorization.user_factors[user_id]
                item_grad = error * self.matrix_factorization.user_factors[user_id] - 
                          self.matrix_factorization.regularization * self.matrix_factorization.item_factors[item_id]
                
                self.matrix_factorization.user_factors[user_id] += learning_rate * user_grad
                self.matrix_factorization.item_factors[item_id] += learning_rate * item_grad

完整系统演示与部署

整合所有组件,构建完整的推荐系统应用。

def demo_recommendation_system():
    """演示完整的推荐系统工作流程"""
    print("=== Python智能推荐系统演示 ===")
    
    # 初始化系统
    data_source = "simulated_data"
    hybrid_rec = HybridRecommender(data_source)
    
    # 加载和预处理数据
    print("1. 加载数据...")
    hybrid_rec.load_data()
    
    # 训练模型
    print("2. 训练模型...")
    hybrid_rec.train_models()
    
    # 生成推荐示例
    print("3. 生成推荐...")
    test_user_id = 42
    recommendations = hybrid_rec.ensemble_recommend(test_user_id, 5)
    
    print(f"为用户 {test_user_id} 生成的推荐:")
    for i, (item_id, score) in enumerate(recommendations, 1):
        print(f"  {i}. 物品 {item_id} (预测评分: {score:.3f})")
    
    # 评估系统性能
    print("4. 系统评估...")
    # 创建测试数据(实际应用中应从真实数据划分)
    test_data = []
    for _ in range(100):
        user_id = np.random.randint(0, hybrid_rec.user_based_cf.user_item_matrix.shape[0])
        item_id = np.random.randint(0, hybrid_rec.user_based_cf.user_item_matrix.shape[1])
        rating = hybrid_rec.user_based_cf.user_item_matrix[user_id, item_id]
        if rating > 0:
            test_data.append((user_id, item_id, rating))
    
    evaluator = RecommendationEvaluator(hybrid_rec, test_data)
    evaluation_results = evaluator.comprehensive_evaluation()
    
    # 演示在线学习
    print("5. 在线学习演示...")
    online_rec = OnlineRecommender(data_source)
    online_rec.load_data()
    online_rec.train_models()
    
    # 模拟新用户交互
    print("模拟新用户交互...")
    for i in range(10):
        user_id = np.random.randint(0, 100)
        item_id = np.random.randint(0, 500)
        rating = np.random.randint(1, 6)
        online_rec.record_interaction(user_id, item_id, rating)
    
    print("=== 演示完成 ===")

if __name__ == "__main__":
    demo_recommendation_system()

总结与最佳实践

通过本教程,我们构建了一个完整的Python智能推荐系统,具备以下核心特性:

  • 多算法融合:结合协同过滤和矩阵分解的优势
  • 实时推荐:支持增量学习和在线更新
  • 全面评估:多维度指标评估系统性能
  • 可扩展架构:易于集成新算法和功能
  • 生产就绪:包含错误处理和性能优化

生产环境部署建议

  1. 使用Redis或Memcached缓存热门推荐结果
  2. 实现A/B测试框架验证算法效果
  3. 建立监控系统跟踪推荐质量指标
  4. 使用Docker容器化部署确保环境一致性
  5. 实现推荐解释功能增强用户信任

推荐系统是机器学习最具商业价值的应用之一,掌握这些技术将为你在电商、内容平台和社交网络等领域的发展提供强大助力。

Python机器学习实战:构建智能推荐系统与协同过滤算法 | AI技术应用
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python机器学习实战:构建智能推荐系统与协同过滤算法 | AI技术应用 https://www.taomawang.com/server/python/1183.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务