首页 > 解决方案 > 如何在使用 reducebykey 时限制单个记录中的对数

问题描述

我的输入如下。

输入:

key,A,1
key,A,2
key,A,3
key,A,4
key,A,5
key,A,6

我正在使用下面的代码来实现我的第一个输出

val finalOutputRDD = AddDeletesRDD.map(x => ( x.split("~").slice(0, endOfKeyPosition).mkString(","), x.split("~").slice(0, 1).mkString(",") + "~" + x.split("~").slice(3, 4).mkString(",") ))
 .sortByKey()                                      
 .reduceByKey((key, value) => key +"|" + value)
 .map(records => records._1 + "," + records._2)

finalOutputRDD.saveAsTextFile(deltaFileLocation)

我的输出是:

key,A~1|A~2|A~3|A~4|A~5|A~6|

现在我想动态传递一个值(比如 3),我想要以下输出:

key,A~1|A~2|A~3
key,A~4|A~5|A~6

标签: scalaapache-spark

解决方案


我认为您必须按键分组,然后生成键加整数的新键,按这些键重新组合,最后丢弃生成的整数。就像是:

def reduceByKeyMaxN[K, V](rdd: RDD[(K, V)], n: Int, f: (V, V) => V): RDD[(K, V)] = {
  rdd
    .groupByKey()
    .flatMap { case (k, vs) =>
      vs.zipWithIndex.map{ case (v, i) => ((k, i / n), v) }
    }
    .reduceByKey(f)
    .map { case ((k, _), v) => (k, v) }
 }

然后,您可以将reduceByKey代码中的调用替换为对此方法的调用。


推荐阅读