scala - Spark scala,过滤结果大小大于原始数据?
问题描述
遇到了一些关于过滤器的困惑。有一个 RDD val 命名单词如下:
Array(Array('1239423', '42132'), Array('245123', '32412'), ...)
第一个元素是用户 ID,第二个元素是项目 ID。
以及另一个 val trainitemids_value 中的一组有效项目 ID,如下所示:
Array('42132', '43123', ...)
我想使用这个有效的 id 集对单词应用过滤器。据我了解,以下两种方法的输出数量应该相同:
val ids = words.map(line => line(1))
val re = ids.filter(line => trainitemids_value.contains(line))\
或者
val re = words.filter(line => trainitemids_value.contains(line(1)))
但实际上它是不同的。方法1的数量是有意义的,因为它小于原始单词的数量。与原始单词相比,方法 2 的结果具有更大的数量。
我不明白为什么过滤器的输出数量可以大于原始集合?
以下是我的控制台的原始输出:
scala> val ids = words.map(line => line(1))
ids: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at map at <console>:28
scala> val re = ids.filter(line => validID.contains(line))
re: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at filter at <console>:42
scala> re.count()
res4: Long = 42548
scala> val re2 = words.filter(line => validID.contains(line(1)))
re2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[16] at filter at <console>:40
scala> re2.count()
res5: Long = 2448569
scala> words.count()
res6: Long = 42549
根据@vindev 的回答,我尝试缓存单词RDD。结果现在看起来很合理。我仍然没有完全理解原因。以下是解决方案:
scala> val cached = words.cache
cached: words.type = MapPartitionsRDD[13] at map at <console>:26
scala> cached.count()
res7: Long = 42549`
scala> val re3 = cached.filter(line => validID.contains(line(1)))
re3: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[17] at filter at <console>:42
scala> re3.count()
res8: Long = 42548
解决方案
推荐阅读
- curl - 为什么多格式的 curl 命令不起作用
- java - 使用 Java 中的 ARN 角色连接到 AWS S3
- javascript - 如何使用 Chrome 驱动程序(Python 和 Selenium)在 chrome 中禁用 JavaScript
- android - Android 在 CheckedTextView 中更改颜色
- angular - 跟踪网站上的设计更改
- asp.net - 通过 MailKit 发送电子邮件的问题
- ios - +[_CFXNotificationTokenRegistration keyCallbacks] 崩溃
- reactjs - 当我们使用 react-datepicker 时,时间正在改变
- android-studio - 在源代码中查看资源变量内容的任何快捷方式?
- airflow - All_Success 触发规则未按预期工作