scala - 在 rdd 上使用累加器循环
问题描述
我想循环 n 次,其中 n 是同一个 rdd 上的累加器
假设 n = 10 所以我希望下面的代码循环 5 次(因为累加器增加了 2)
val key = keyAcm.value.toInt
val rest = rdd.filter(_._1 > (key + 1))
val combined = rdd.filter(k => (k._1 == key) || (k._1 == key + 1))
.map(x => (key, x._2))
.reduceByKey { case (x, y) => (x ++ y) }
keyAcm.add(2)
combined.union(rest)
使用此代码,我过滤 rdd 并保留键 0(累加器的初始值)和 1。然后,我尝试合并其第二个参数并更改键以创建具有键 0 和合并数组的新 rdd。之后,我将此 rdd 与原始的 rdd 合并,留下过滤后的值(0 和 1)。最后,我将累加器增加了 2。如何重复这些步骤直到累加器为 10?
有任何想法吗?
解决方案
val rdd: RDD[(Int, String)] = ???
val res: RDD[(Int, Iterable[String])] = rdd.map(x => (x._1 / 2, x._2)).groupByKey()
推荐阅读
- python - 错误:但是这台机器只有:['/cpu:0']。- 但识别 2 gpus
- ios - iOS:如何正确设置/重置 UIScreen.main.brightness
- php - 如何在 CakePHP 中获得退回的电子邮件?
- java - 是否可以设置计时器以在材料设计中显示/隐藏密码?
- assembly - 汇编语言新手并出现错误:“add.s:7: Error: shift expression expected -- `adds R0,R1,R2,R3'”
- python - 在字符串中查找特定单词+任何单词+特定单词python
- vue.js - Vue-table-2 表不会更新
- javascript - 保存在 indexeddb 中时,文件上的分配属性消失
- codesys - 如何在 CODESYS 中参数化功能块?
- java - 如何使用通过以太网在另一个线程中接收的数据更新 javaFX 表