首页 > 解决方案 > Spark scala,过滤结果大小大于原始数据?

问题描述

遇到了一些关于过滤器的困惑。有一个 RDD val 命名单词如下:

Array(Array('1239423', '42132'), Array('245123', '32412'), ...)

第一个元素是用户 ID,第二个元素是项目 ID。

以及另一个 val trainitemids_value 中的一组有效项目 ID,如下所示:

Array('42132', '43123', ...)

我想使用这个有效的 id 集对单词应用过滤器。据我了解,以下两种方法的输出数量应该相同:

val ids = words.map(line => line(1))
val re = ids.filter(line => trainitemids_value.contains(line))\

或者

val re = words.filter(line => trainitemids_value.contains(line(1)))

但实际上它是不同的。方法1的数量是有意义的,因为它小于原始单词的数量。与原始单词相比,方法 2 的结果具有更大的数量。

我不明白为什么过滤器的输出数量可以大于原始集合?

以下是我的控制台的原始输出:

scala> val ids = words.map(line => line(1))
ids: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at map at <console>:28
scala> val re = ids.filter(line => validID.contains(line))
re: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at filter at <console>:42
scala> re.count()
res4: Long = 42548
scala> val re2 = words.filter(line => validID.contains(line(1)))
re2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[16] at filter at <console>:40
scala> re2.count()
res5: Long = 2448569
scala> words.count()
res6: Long = 42549

根据@vindev 的回答,我尝试缓存单词RDD。结果现在看起来很合理。我仍然没有完全理解原因。以下是解决方案:

scala> val cached = words.cache
cached: words.type = MapPartitionsRDD[13] at map at <console>:26

scala> cached.count()
res7: Long = 42549`
scala> val re3 = cached.filter(line => validID.contains(line(1)))
re3: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[17] at filter at <console>:42
scala> re3.count()
res8: Long = 42548

标签: scalaapache-sparkfilter

解决方案


推荐阅读