apache-spark - 如何使用 Spark Runner 在 Apache Beam 中重新洗牌
问题描述
我正在使用 spark runner 进行此模拟:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
p.apply(Create.of(1))
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
IntStream.range(0, 4_000_000).forEach(outputReceiver::output);
}
}))
.apply(Reshuffle.viaRandomKey())
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
try {
// simulate a rpc call of 10ms
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
outputReceiver.output(element);
}
}));
PipelineResult result = p.run();
result.waitUntilFinish();
我正在运行,--runner=SparkRunner --sparkMaster=local[8]
但重新洗牌后只使用了 1 个线程。为什么 Rechuffle 不起作用?
如果我为此更改改组:
.apply(MapElements.into(kvs(integers(), integers())).via(e -> KV.of(e % 8, e)))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables())
然后我运行 8 个线程。
BR,拉斐尔。
解决方案
看起来 Reshuffle on Beam on Spark 归结为在
我想知道在这种情况下是否两者rdd.context().defaultParallelism()
都是rdd.getNumPartitions()
1。我已提交https://issues.apache.org/jira/browse/BEAM-10834进行调查。
同时,您可以使用 GroupByKey 来获得所需的并行度,如您所指出的。(如果您实际上没有整数,您可以尝试使用元素的哈希值、Math.random() 或什至递增计数器作为键)。
推荐阅读
- php - 如何删除重复行并继续该值?
- django - 错误“无法更改,因为数据未验证”
- gitlab - CI_APPLICATION_TAG 在 GItLab 中定义在哪里?
- java - 动态分配 Java 数组意味着什么?
- ruby-on-rails - 在 Ruby on Rails 中部分渲染反应组件
- reporting-services - 如何在来自另一个数据集的文本框中显示 SUM 值,而不是来自 tablix 数据集名称
- assembly - 时间戳在文件中的什么位置?想用汇编来改变它
- xslt - xsl中的大于和小于条件
- awk - 使用 awk 显示从第一列开始的行范围
- google-apps-script - Google App Script - 无法插入群组成员