四、离线推荐服务建设(基于隐语义模型的协同过滤推荐)

4.1 离线推荐服务

离线推荐服务是综合用户所有的历史数据,利用设定的离线统计算法和离线推荐算法周期性的进行结果统计与保存,计算的结果在一定时间周期内是固定不变的,变更的频率取决于算法调度的频率。

离线推荐服务主要计算一些可以预先进行统计和计算的指标,为实时计算和前端业务相应提供数据支撑。

离线推荐服务主要分为统计推荐、基于隐语义模型的协同过滤推荐以及基于内容和基于Item-CF的相似推荐。我们这一篇主要介绍基于隐语义模型的协同过滤推荐。

4.2 离线统计服务

详见:电商推荐系统四: 离线推荐服务建设(基于统计的推荐)

4.3 基于隐语义模型的协同过滤推荐

项目采用ALS作为协同过滤算法,根据MongoDB中的用户评分表计算离线的用户商品推荐列表以及商品相似度矩阵。

4.3.1 用户商品推荐列表

通过ALS训练出来的Model来计算所有当前用户商品的推荐列表,主要思路如下:

  1. userId和productId做笛卡尔积,产生(userId,productId)的元组
  2. 通过模型预测(userId,productId)对应的评分。
  3. 将预测结果通过预测分值进行排序。
  4. 返回分值最大的K个商品,作为当前用户的推荐列表。

最后生成的数据结构如下:将数据保存到MongoDB的UserRecs表中
在这里插入图片描述

新建recommender的子项目OfflineRecommender,引入spark、scala、mongo和jblas的依赖:

<dependencies>

    <dependency>
        <groupId>org.scalanlp</groupId>
        <artifactId>jblas</artifactId>
        <version>${jblas.version}</version>
    </dependency>

    <!-- Spark的依赖引入 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.11</artifactId>
    </dependency>
    <!-- 引入Scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>

    <!-- 加入MongoDB的驱动 -->
    <!-- 用于代码方式连接MongoDB -->
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah-core_2.11</artifactId>
        <version>${casbah.version}</version>
    </dependency>
    <!-- 用于Spark和MongoDB的对接 -->
    <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.11</artifactId>
        <version>${mongodb-spark.version}</version>
    </dependency>
</dependencies>

同样经过前期的构建样例类、声明配置、创建SparkSession等步骤,可以加载数据开始计算模型了。

核心代码如下:
src/main/scala/com.recom.offline/OfflineRecommender.scala

case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)

case class MongoConfig(uri:String, db:String)

// 标准推荐对象,productId,score
case class Recommendation(productId: Int, score:Double)

// 用户推荐列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])

// 商品相似度(商品推荐)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])

object OfflineRecommmeder {

  // 定义常量
  val MONGODB_RATING_COLLECTION = "Rating"

  // 推荐表的名称
  val USER_RECS = "UserRecs"
  val PRODUCT_RECS = "ProductRecs"

  val USER_MAX_RECOMMENDATION = 20

  def main(args: Array[String]): Unit = {
    // 定义配置
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )

    // 创建spark session
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))

    import spark.implicits._
//读取mongoDB中的业务数据
val ratingRDD = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(rating=> (rating.userId, rating.productId, rating.score)).cache()
//用户的数据集 RDD[Int]
val userRDD = ratingRDD.map(_._1).distinct()
val prodcutRDD = ratingRDD.map(_._2).distinct()

//创建训练数据集
val trainData = ratingRDD.map(x => Rating(x._1,x._2,x._3))
// rank 是模型中隐语义因子的个数, iterations 是迭代的次数, lambda 是ALS的正则化参
val (rank,iterations,lambda) = (50, 5, 0.01)
// 调用ALS算法训练隐语义模型
val model = ALS.train(trainData,rank,iterations,lambda)

//计算用户推荐矩阵
val userProducts = userRDD.cartesian(productRDD)
// model已训练好,把id传进去就可以得到预测评分列表RDD[Rating] (userId,productId,rating)
val preRatings = model.predict(userProducts)

val userRecs = preRatings
.filter(_.rating > 0)
.map(rating => (rating.user,(rating.product, rating.rating)))
.groupByKey()    
.map{
case (userId,recs) => UserRecs(userId,recs.toList.sortWith(_._2 >
_._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1,x._2)))
}.toDF()

userRecs.write
.option("uri",mongoConfig.uri)
.option("collection",USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()

//TODO:计算商品相似度矩阵

// 关闭spark
spark.stop()
}
}

4.3.2 商品相似度矩阵

通过ALS计算商品相似度矩阵,该矩阵用于查询当前商品的相似商品并为实时推荐系统服务。

离线计算的ALS 算法,算法最终会为用户、商品分别生成最终的特征矩阵,分别是表示用户特征矩阵的U(m x k)矩阵,每个用户由 k个特征描述;表示物品特征矩阵的V(n x k)矩阵,每个物品也由 k 个特征描述。

V(n x k)表示物品特征矩阵,每一行是一个 k 维向量,虽然我们并不知道每一个维度的特征意义是什么,但是k 个维度的数学向量表示了该行对应商品的特征。

在这里插入图片描述

数据集中任意两个商品间相似度都可以由公式计算得到,商品与商品之间的相似度在一段时间内基本是固定值。最后生成的数据保存到MongoDB的ProductRecs表中。
在这里插入图片描述

核心代码如下:

//计算商品相似度矩阵
//获取商品的特征矩阵,数据格式 RDD[(scala.Int, scala.Array[scala.Double])]
val productFeatures = model.productFeatures.map{case (productId,features) =>
  (productId, new DoubleMatrix(features))
}

// 计算笛卡尔积并过滤合并
val productRecs = productFeatures.cartesian(productFeatures)
  .filter{case (a,b) => a._1 != b._1}  
  .map{case (a,b) =>
    val simScore = this.consinSim(a._2,b._2) // 求余弦相似度
    (a._1,(b._1,simScore))
  }.filter(_._2._2 > 0.6)    
  .groupByKey()             
  .map{case (productId,items) =>
    ProductRecs(productId,items.toList.map(x => Recommendation(x._1,x._2)))
  }.toDF()

productRecs
  .write
  .option("uri", mongoConfig.uri)
  .option("collection",PRODUCT_RECS)
  .mode("overwrite")
  .format("com.mongodb.spark.sql")
  .save()
其中,consinSim是求两个向量余弦相似度的函数,代码实现如下:
//计算两个商品之间的余弦相似度
def consinSim(product1: DoubleMatrix, product2:DoubleMatrix) : Double ={
  product1.dot(product2) / ( product1.norm2()  * product2.norm2() )
}

4.3.3 模型评估和参数选取

在上述模型训练的过程中,我们直接给定了隐语义模型的rank,iterations,lambda三个参数。对于我们的模型,这并不一定是最优的参数选取,所以我们需要对模型进行评估。通常的做法是计算均方根误差(RMSE),考察预测评分与实际评分之间的误差。
在这里插入图片描述

有了RMSE,我们可以就可以通过多次调整参数值,来选取RMSE最小的一组作为我们模型的优化选择。

在scala/com.recom.offline/下新建单例对象ALSTrainer,代码主体架构如下:

def main(args: Array[String]): Unit = {
  val config = Map(
    "spark.cores" -> "local[*]",
    "mongo.uri" -> "mongodb://localhost:27017/recommender",
    "mongo.db" -> "recommender"
  )
  //创建SparkConf
  val sparkConf = new SparkConf().setAppName("ALSTrainer").setMaster(config("spark.cores"))
  //创建SparkSession
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))

  import spark.implicits._

  //加载评分数据
  val ratingRDD = spark
    .read
    .option("uri",mongoConfig.uri)
    .option("collection",OfflineRecommender.MONGODB_RATING_COLLECTION)
    .format("com.mongodb.spark.sql")
    .load()
    .as[ProductRating]
    .rdd
    .map(rating => Rating(rating.userId,rating.productId,rating.score)).cache()

  // 将一个RDD随机切分成两个RDD,用以划分训练集和测试集
  val splits = ratingRDD.randomSplit(Array(0.8, 0.2))

  val trainingRDD = splits(0)
  val testingRDD = splits(1)

  //输出最优参数
  adjustALSParams(trainingRDD, testingRDD)

  //关闭Spark
  spark.close()
}

其中adjustALSParams方法是模型评估的核心,输入一组训练数据和测试数据,输出计算得到最小RMSE的那组参数。代码实现如下:
// 输出最终的最优参数

def adjustALSParams(trainData:RDD[Rating], testData:RDD[Rating]): Unit ={
// 这里指定迭代次数为5,rank和lambda在几个值中选取调整
  val result = for(rank <- Array(100,200,250); lambda <- Array(1, 0.1, 0.01, 0.001))
    yield {
      val model = ALS.train(trainData,rank,5,lambda)
      val rmse = getRMSE(model, testData)
      (rank,lambda,rmse)
    }
  // 按照rmse排序
  println(result.sortBy(_._3).head)
}

计算RMSE的函数getRMSE代码实现如下:

def getRMSE(model:MatrixFactorizationModel, data:RDD[Rating]):Double={
  val userProducts = data.map(item => (item.user,item.product))
  val predictRating = model.predict(userProducts)
val real = data.map(item => ((item.user,item.product),item.rating))
  val predict = predictRating.map(item => ((item.user,item.product),item.rating))
  // 计算RMSE
  sqrt(
    real.join(predict).map{case ((userId,productId),(real,pre))=>
      // 真实值和预测值之间的差
      val err = real - pre
      err * err
    }.mean()
  )
}

运行代码,我们就可以得到目前数据的最优模型参数。

4.4 附件:完整代码

package com.recom.offline

import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.jblas.DoubleMatrix

case class ProductRating(userId: Int,productId:Int,score:Double,timestamp:Int)
case class MongoConfig(uri:String,db:String)

//定义标准推荐对象
case class Recommendation(productId: Int,score:Double)
//定义用户推荐列表
case class UserRecs(userId:Int,recs:Seq[Recommendation])
//定义商品相似度列表
case class ProductRecs(productId:Int,recs:Seq[Recommendation])


object OfflineRecommender {
  
  //定义monogobd中存储的表名
  val MONGODB_RATING_COLLECTION = "Rating"
  val USER_RECS="UserRecs"
  val PRODUCT_RECS="ProductRecs"
  val USER_MAX_RECOMMENDATION=20
  
  def main(args: Array[String]): Unit = {
  
    //定义基础配置的集合(可以放入配置文件,通过方法获取属性的值)
    val config = Map(
      "spark.cores"->"local[*]",
      "mongo.uri"->"mongodb://hadoop102:27017/recommender",
      "mongo.db"->"recommender"
    )
  
    //创建一个spark config
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
    //创建一个spark session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  
    //导入隐式转换类,在DF和DS转换的过程中会使用到
    import spark.implicits._
    //通过隐式类的方法创建mongodb连接对象
    implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
    
    //加载数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[ProductRating]
      .rdd
      .map(rating=>(rating.userId,rating.productId,rating.score))
      .cache()
    
    //取出所有的用户和商品的数据集,为隐语义模型训练做准备
    val userRDD: RDD[Int] = ratingRDD.map(_._1).distinct()
    val productRDD: RDD[Int] = ratingRDD.map(_._2).distinct()
    
    
    //核心计算过程
    //1.训练隐语义模型
    val trainData = ratingRDD.map(x=>Rating(x._1,x._2,x._3))
    //定义隐语义模型的训练参数,rank隐特征个数,iterations迭代次数,lambda:正则化系数
    val (rank,iterations,lambda)=(5,10,0.01)
    val model = ALS.train(trainData,rank,iterations,lambda)
    
    
    //2.获取预测评分矩阵,得到用户推荐列表
    //用userRDD和productRDD做一个笛卡尔积,得到空的userProductsRDD表示的评分矩阵
    val userProducts: RDD[(Int, Int)] = userRDD.cartesian(productRDD)
    val preRating: RDD[Rating] = model.predict(userProducts)
    
    //从预测评分矩阵中提取得到用户的推荐列表
    val userRecs: DataFrame = preRating.filter(_.rating > 0)
      .map(rating => (rating.user, (rating.product, rating.rating)))
      .groupByKey()
      .map {
        case (userId, recs) =>
          val recomTopN: List[(Int, Double)] = recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION)
          val recommendations: List[Recommendation] = recomTopN.map(x => Recommendation(x._1, x._2))
          UserRecs(userId, recommendations)
      }.toDF()
    userRecs.write
        .option("uri",mongoConfig.uri)
        .option("collection",USER_RECS)
        .mode("overwrite")
        .format("com.mongodb.spark.sql")
        .save()
    
    
    //3.利用商品特征向量,计算商品的相似度列表
    val productFeatures: RDD[(Int, DoubleMatrix)] = model.productFeatures
      .map { case (productId, features) => (productId, new DoubleMatrix(features)) }
    //两两配对商品,计算余弦相似度
    val productRecs: DataFrame = productFeatures.cartesian(productFeatures)
      .filter { case (x, y) => x._1 != y._1 }
      //计算余弦相似度
      .map { case (a, b) => {
        val simScore = consinSim(a._2, b._2)
        (a._1, (b._1, simScore))
      }
      }
      .filter(_._2._2 > 0.4)
      .groupByKey()
      .map {
        case (productId, recs) =>
          ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
      }.toDF()
    productRecs.write
      .option("uri",mongoConfig.uri)
      .option("collection",PRODUCT_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    
    
    //4.利用用户特征向量,计算用户的相似度列表
    /*
    val userFeatures: RDD[(Int, DoubleMatrix)] = model.userFeatures
      .map { case (userId, features) => (userId, new DoubleMatrix(features)) }
    //两两配对商品,计算余弦相似度
    val userRecs: DataFrame = userFeatures.cartesian(userFeatures)
      .filter { case (x, y) => x._1 != y._1 }
      //计算余弦相似度
      .map { case (a, b) => {
        val simScore = consinSim(a._2, b._2)
        (a._1, (b._1, simScore))
      }
      }
      .filter(_._2._2 > 0.4)
      .groupByKey()
      .map {
        case (userId, recs) =>
          ProductRecs(userId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
      }.toDF()
    */
    
    spark.stop()
    
    
  }

  def consinSim(product1: DoubleMatrix, product2: DoubleMatrix) ={
  
    product1.dot(product2) / (product1.norm2() * product2.norm2())
  
  }

}

模型调优:

package com.recom.offline

import breeze.numerics.sqrt
import com.recom.offline.OfflineRecommender.MONGODB_RATING_COLLECTION
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object ALSTrainer {
  
  def main(args: Array[String]): Unit = {
  
    //定义基础配置的集合(可以放入配置文件,通过方法获取属性的值)
    val config = Map(
      "spark.cores"->"local[*]",
      "mongo.uri"->"mongodb://hadoop102:27017/recommender",
      "mongo.db"->"recommender"
    )
  
    //创建一个spark config
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ALSTrainer")
    //创建一个spark session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  
    //导入隐式转换类,在DF和DS转换的过程中会使用到
    import spark.implicits._
    //通过隐式类的方法创建mongodb连接对象
    implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
  
    //加载数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[ProductRating]
      .rdd
      .map(rating=>Rating(rating.userId,rating.productId,rating.score))
      .cache()
    
    
    //将数据切分为训练集和测试集
    val splits= ratingRDD.randomSplit(Array(0.8,0.2))
    val trainingRDD=splits(0)
    val testingRDD=splits(1)
    
    //核心实现:输出最优参数
    adjustALSParams(trainingRDD,testingRDD)
    
    spark.stop()
    
  }
  
  
 def adjustALSParams(trainData: RDD[Rating], testData: RDD[Rating])={
   
    //定义训练参数
    val ranks=Array(5,10,20,50)
    val intertions=Array(10,25,50)
    val lambdas=Array(1,0.1,0.01)
    //遍历数组中定义的参数取值(加上迭代次数后会非常吃机器的性能)
//    val result= for (rank<-ranks;intertion<-intertions;lambda<-lambdas) yield{
//        val model: MatrixFactorizationModel = ALS.train(trainData,rank,intertion,lambda)
//        val rmse = getRMSE(model,testData)
//        (rank,intertion,lambda,rmse)
//      }
  
   val result= for (rank<-ranks;lambda<-lambdas) yield{
     val model: MatrixFactorizationModel = ALS.train(trainData,rank,10,lambda)
     val rmse = getRMSE(model,testData)
     (rank,lambda,rmse)
   }
   for (elem <- result) {
     println(elem)
   }
    
  }
  
  
  def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating])={
    
    //构建userProducts,得到预测评分矩阵
    val userProducts: RDD[(Int, Int)] = data.map(item=>(item.user,item.product))
    val predictRating = model.predict(userProducts)
    
    //按照公式计算rmse,首先把预测评分和实际评分按照(userId,productId)做一个连接
    val observed: RDD[((Int, Int), Double)] = data.map(item=>((item.user,item.product),item.rating))
    val predict: RDD[((Int, Int), Double)] = predictRating.map(item=>((item.user,item.product),item.rating))
    
    sqrt(
      observed.join(predict).map{
        case ((userId,productId),(actual,pre))=>
          val err = actual - pre
          err*err
      }.mean()
    )
  }
  
}

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐