scala - 如何在使用 reducebykey 时限制单个记录中的对数
问题描述
我的输入如下。
输入:
key,A,1
key,A,2
key,A,3
key,A,4
key,A,5
key,A,6
我正在使用下面的代码来实现我的第一个输出
val finalOutputRDD = AddDeletesRDD.map(x => ( x.split("~").slice(0, endOfKeyPosition).mkString(","), x.split("~").slice(0, 1).mkString(",") + "~" + x.split("~").slice(3, 4).mkString(",") ))
.sortByKey()
.reduceByKey((key, value) => key +"|" + value)
.map(records => records._1 + "," + records._2)
finalOutputRDD.saveAsTextFile(deltaFileLocation)
我的输出是:
key,A~1|A~2|A~3|A~4|A~5|A~6|
现在我想动态传递一个值(比如 3),我想要以下输出:
key,A~1|A~2|A~3
key,A~4|A~5|A~6
解决方案
我认为您必须按键分组,然后生成键加整数的新键,按这些键重新组合,最后丢弃生成的整数。就像是:
def reduceByKeyMaxN[K, V](rdd: RDD[(K, V)], n: Int, f: (V, V) => V): RDD[(K, V)] = {
rdd
.groupByKey()
.flatMap { case (k, vs) =>
vs.zipWithIndex.map{ case (v, i) => ((k, i / n), v) }
}
.reduceByKey(f)
.map { case ((k, _), v) => (k, v) }
}
然后,您可以将reduceByKey
代码中的调用替换为对此方法的调用。
推荐阅读
- regex - 在正则表达式中捕获特定范围的逗号分隔字符串
- r - 如何在 R 中绘制布朗运动(布莱克斯科尔斯模拟)
- javascript - 如何以角度将缓冲区数据转换为base64图像
- suitecrm - 自定义套件 CRM 字段
- c# - 布局组件中的访问方法
- ruby-on-rails - NameError:未初始化的常量 ActiveRecord::ConnectionAdapters::PostgreSQL::ColumnDefinition 与 gem activerecord-postgis-adapter
- python - 当你使用 len(sheet[column_a]) 时,openpyxl 总是输出最长列的长度,即使 column_a 不是最长的
- python - 训练前后的皮尔逊相关性计算相对于一个值
- vue.js - 从自定义 .js 文件访问应用程序上下文以获取语言环境消息
- babylonjs - 在 babylon.js 中绘制由 CSG 创建的网格的外边缘