Spark使用Java开发遇到的那些类型错误

Spark使用Java开发其实比较方便的,JAVA8的lambda表达式使得编写体验并不比Scala差很多,但是因为Spark本身使用Scala实现,导致使用Java开发的时候,也遇到不少的类型匹配问题。

本文列举出自己在工作开发中遇到的一些问题,供大家参考:

WrappedArray和Vector

报错信息为:Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.ml.linalg.Vector

当使用DataFrame打印Schema的时候,是这样的输出:

 |-- tag_weights: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- word_sims: array (nullable = true)
 |    |-- element: double (containsNull = true)

 

这时候如果Java用Vector接收,就会报这个错误,JAVA代码为:

spark.udf().register(
                "computeWeightSim",
                new UDF2<Vector, Vector, Double>() {
                    @Override
                    public Double call(Vector tag_weights, Vector word_sims) throws Exception {

解决办法是使用WrappedArray<Long>来接收,这是个scala的类型,可以用Iterator做遍历:

scala.collection.Iterator<Long> it1 = view_qipuids.iterator();
scala.collection.Iterator<Long> it2 = view_cnts.iterator();

Map<Long, Long> viewMap = new HashMap<>();
while (it1.hasNext() && it2.hasNext()) {
    viewMap.put(it1.next(), it2.next());
}

或者可以zip两个iterator进行计算:

new UDF2<WrappedArray<Double>, WrappedArray<Double>, Double>() {
    /**
     * 计算加权权重
     * @param tag_weights 加权
     * @param word_sims 计算结果目标
     * @return 加权权重
     * @throws Exception
     */
    @Override
    public Double call(WrappedArray<Double> tag_weights, WrappedArray<Double> word_sims) throws Exception {
        scala.collection.Iterator<Double> tag_weightsIter = tag_weights.iterator();
        scala.collection.Iterator<Double> word_simsIter = word_sims.iterator();

        scala.collection.Iterator<Tuple2<Double, Double>> zipIterator = tag_weightsIter.zip(word_simsIter);

        double totalWeight = 0.0;
        double fenziWeight = 0.0;
        while (zipIterator.hasNext()) {
            Tuple2<Double, Double> iterTuple = zipIterator.next();
            totalWeight += iterTuple._1;
            fenziWeight += iterTuple._1 * iterTuple._2;
        }

        if (totalWeight == 0.0) {
            return 0.0;
        } else {
            return fenziWeight / totalWeight;
        }
    }
}

 

详细内容见scala的文档:https://docs.scala-lang.org/overviews/collections/iterators.html

 

相关推荐

Leave a Comment