首页 > 解决方案 > 为什么在向量的 Spark 数据帧上进行 MergeSort 连接后的不同计数会给出不确定的结果

问题描述

使用包含 ml Vector 的列的 spark 内部 SortMerge Join 似乎会在较大的数据集上给出不确定和不准确的结果。

我正在使用 Spark v2.4.3 的 BucketRandomLSH 投影的approxNearestNeighbors方法,发现它为大型数据集提供了不同数量的结果。

此问题仅在执行 SortMerge 连接时出现;广播连接每次都给出相同的结果。

我将问题追溯到 LSH 哈希键上的连接。下面是一个可重现的示例...

import org.apache.spark.sql.functions._
import org.apache.spark.ml.linalg.Vectors
import scala.util.Random
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
//create a large dataframe containing an Array (length two) of ml Vectors of length one.
val df =  Seq.fill(30000)(
        (java.util.UUID.randomUUID.toString,
          Seq(
                  Vectors.dense(Random.nextInt(10).toDouble),                                                                 
                  Vectors.dense(Random.nextInt(10).toDouble))
             )
      ).toDF

//ensure it's caches
df.cache.count()

//positional explode the vector column
val dfExploded = df.select(col("*"), posexplode(col("_2")))

// now self join on the exploded 'col' and 'pos' fields
dfExploded.join(dfExploded, Seq("pos","col")).drop("pos","col").distinct.count

每次结果都不一样...

scala>dfExploded.join(dfExploded,Seq("pos","col")).drop("pos","col").distinct.count
res139: Long = 139663581                                                        

scala>dfExploded.join(dfExploded,Seq("pos","col")).drop("pos","col").distinct.count
res140: Long = 156349630  

标签: apache-spark

解决方案


推荐阅读