apache-spark - Spark:在每个分区中变得不同
问题描述
我想使用 ID 对数据进行分区,并且在每个分区中我想
- 应用一组操作
- 采取不同的
在每个分区内进行区分将避免洗牌。
val rowRDD = sc.textFile("flatten_test_data")
.filter(_.nonEmpty)
.map { l =>
val arr = l.split("\u0001")
val id = arr(0)
val value = arr(1)
(id,value)
}.partitionBy(new HashPartitioner(4))
.persist()
现在做一些类似的事情 -
rowRDD.foreachPartition {records => applyOpers(records)}
applyOpers(dataset) 应该做类似的事情 -
dataset.withColumn(udf1).withColumn(udf2).distinct
解决方案
forEachPartition
在 executor 上执行。因此,您不能在 forEachPartition 中访问 SparkContext/SparkSession。
您可以用作&mapPartitions()
的替代品。与&不同,每个 Partition 调用一次,而&则为 RDD 中的每个元素调用一次。主要优点是,我们可以基于每个分区而不是基于每个元素进行初始化。map()
foreach()
mapPartitions()
map()
foreach()
我们得到Iterator
的参数mapPartition
,通过它我们可以遍历分区中的所有元素。
例如:(这个例子是在java中,但这应该给你一个想法。)
JavaRDD<Integer> rdd = sc.parallelize(
Arrays.asList(1, 2, 3, 4, 5));
FlatMapFunction<Iterator<Integer>, AvgCount> setup = new FlatMapFunction<Iterator<Integer>, AvgCount>() {
@Override
public Iterable<AvgCount> call(Iterator<Integer> input) {
AvgCount a = new AvgCount(0, 0);
while (input.hasNext()) {
a.total_ += input.next();
a.num_ += 1;
}
ArrayList<AvgCount> ret = new ArrayList<AvgCount>();
ret.add(a);
return ret;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
@Override
public AvgCount call(AvgCount a, AvgCount b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
}
};
AvgCount result = rdd.mapPartitions(setup).reduce(combine);
推荐阅读
- git - 将分支重新基于重写的历史记录时,如何避免虚假的合并冲突?
- list - SwiftUI - 两个单独的按钮作为一个按钮
- sqlite - 在 SQLite 中创建表失败
- ios - 我在主包中找不到我的 AppIcon
- gstreamer - 我想配置 autoaudiosink
- python - 每次我尝试按字符串值过滤数据框时,我的数据框都是空的。但是当我尝试从一列中获取计数值时,我得到了数字
- macos - 如何批量重命名文件/文件夹 - 在 n 个字符后插入一个字符
- javascript - 检测所有顶级目录
- javascript - 刷新页面时,Array.length 返回未定义
- kendo-grid - 使用 cypress 在 Kendo UI Grid 中选择一行