Spark使用word2vec训练item2vec实现内容相关推荐

之前使用spark als训练协同过滤,然后导出itemvectors做相似度计算,后来学到了可以用word2vec实现item2vec的训练效果貌似更好,试了一下果然不错;

spark版本:2.3.1,开发语言为JAVA

几大步骤

  1. 读取查看、点击、播放等行为数据,我用的是播放数据;
  2. 数据整理成(userid, itemid, playcnt)的形式,这个数据可能是聚合N天得到的;
  3. 过滤掉playcnt为小于3的数据,我把这些过滤掉,觉得这个数据没有贡献;
  4. 按照userid聚合,得到(userid, list(itemid))的形式;
  5. 训练word2vec;
  6. 导出model.vectors(),里面包括word和对应的向量vector,其中word其实就是itemid
  7. crossjoin计算两两相似度,取相似度TOP N;
  8. 将结果存入mysql,后续可以加载到REDIS实现实时相似推荐;

代码实现

读取播放数据:

Dataset<Row> playDatas = spark.sql(
        "select user_id, item_id, play_cnt " +
                "from hive_play_table group by user_id, item_id");

 

做数据按userid聚合:

playDatas = playDatas
        // 删除掉只播放3次以下的数据
        .filter("play_cnt>2")
        // 按userid聚合
        .groupBy("user_id")
        .agg(collect_list("item_id").as("item_ids"))
        // 至少操作过2个元素
        .where(size(col("item_ids")).geq(2));

 

训练word2vec:

Word2Vec word2Vec = new Word2Vec()
        .setInputCol("item_ids")
        .setOutputCol("word2vec_result")
        .setVectorSize(50)
        .setMinCount(0)
        .setMaxIter(50)
        .setSeed(123);

Word2VecModel word2VecModel = word2Vec.fit(playDatas);

 

实现df的cross join:

Dataset<Row> vectorsA = word2VecModel
        .getVectors()
        .select(
                col("word").as("itemIdA"),
                col("vector").as("vectorA"));

Dataset<Row> vectorsB = word2VecModel
        .getVectors()
        .select(
                col("word").as("itemIdB"),
                col("vector").as("vectorB"));

// self cross join
Dataset<Row> crossDatas = vectorsA.crossJoin(vectorsB);

 

注册余弦相似度计算函数:

spark.udf().register(
        "vectorCosinSim",
        new UDF2<Vector, Vector, Double>() {
            @Override
            public Double call(Vector vectora, Vector vectorb) throws Exception {
                return SimilarityUtils.cosineSimilarity(vectora, vectorb);
            }
        },
        DataTypes.DoubleType
);

 

其中调用的余弦相似度计算函数,使用JAVA实现:

public static double cosineSimilarity(Vector featuresLeft, Vector featuresRight) {
    double[] dataLeft = featuresLeft.toArray();
    List<Float> lista = new ArrayList<>();
    if (dataLeft.length > 0) {
        for (double d : dataLeft) {
            lista.add((float) d);
        }
    }

    double[] dataRight = featuresRight.toArray();
    List<Float> listb = new ArrayList<>();
    if (dataRight.length > 0) {
        for (double d : dataRight) {
            listb.add((float) d);
        }
    }

    return cosineSimilarity(lista, listb);
}

 

实现相似度计算,并过滤掉自身和自身的计算:

crossDatas = crossDatas
        .withColumn(
                "cosineSimilarity", callUDF(
                        "vectorCosinSim", col("vectorA"), col("vectorB")))
        .select("itemIdA", "itemIdB", "cosineSimilarity")
        .filter(col("itemIdA").notEqual(col("itemIdB")));

 

使用 spark的Window,提取每个group的topn:

// 按照相似度倒序排列取TOP 300
WindowSpec windowSpec = Window.partitionBy("itemIdA").orderBy(col("cosineSimilarity").desc());
crossDatas = crossDatas
        .withColumn("simRank", rank().over(windowSpec))
        .where(col("simRank").leq(200));

 

将数据聚合成每个Item的推荐列表的形式:

crossDatas = crossDatas
        .groupBy("itemIdA")
        .agg(
                collect_list("cosineSimilarity").as("columnSims"),
                collect_list("itemIdB").as("itemIds")
        ).select(
                col("itemIdA").as("item_id").cast(DataTypes.LongType),
                col("columnSims").as("column_sims").cast(DataTypes.StringType),
                col("itemIds").as("item_ids").cast(DataTypes.StringType)
        );

 

将数据覆盖写入MySQL:

crossDatas.write().mode(SaveMode.Overwrite).jdbc(
        MysqlConfig.ONLINE_MYSQL_MASTER_URL,
        "item2vec_sims",
        MysqlConfig.getOnlineProperties()
);

 

在数据库中,我们根据item_id,提取到item_ids,可以用于直接的推荐;其中column_sims也记录了对应的相似度权重,如果需要加权的话也可以直接提取;

欢迎大家关注我的爱奇艺号,学习Pyton大数据人工智能技术,地址

本文地址:http://www.crazyant.net/2447.html,转载请注明来源。

 

 

 

相关推荐

Leave a Comment