首页 > 解决方案 > 在 rdd 上使用累加器循环

问题描述

我想循环 n 次,其中 n 是同一个 rdd 上的累加器

假设 n = 10 所以我希望下面的代码循环 5 次(因为累加器增加了 2)

val key = keyAcm.value.toInt
val rest = rdd.filter(_._1 > (key + 1))
val combined = rdd.filter(k => (k._1 == key) || (k._1 == key + 1))
                  .map(x => (key, x._2))
                  .reduceByKey { case (x, y) => (x ++ y) }
keyAcm.add(2)
combined.union(rest)

使用此代码,我过滤 rdd 并保留键 0(累加器的初始值)和 1。然后,我尝试合并其第二个参数并更改键以创建具有键 0 和合并数组的新 rdd。之后,我将此 rdd 与原始的 rdd 合并,留下过滤后的值(0 和 1)。最后,我将累加器增加了 2。如何重复这些步骤直到累加器为 10?

有任何想法吗?

标签: scalaapache-sparkspark-streaming

解决方案


val rdd: RDD[(Int, String)] = ???
val res: RDD[(Int, Iterable[String])] = rdd.map(x => (x._1 / 2, x._2)).groupByKey()

推荐阅读