google-cloud-dataflow - Splittable DoFn 导致 Shuffle key 太大的问题
问题描述
我正在尝试实现一个ListFlatten
功能,我已经使用SimpleDoFn
它实现了它,它工作正常,但用于并行化。我正在将该函数转换为可拆分的 Do 函数。我设法在本地运行一个单元测试,使用 5000 个元素DirectRunner
,同时在 DataFlow 中运行相同的元素,它失败并出现以下错误。
Error Details:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output (GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue (GroupAlsoByWindowFnRunner.java:102)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:126)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:54)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement (GroupAlsoByWindowFnRunner.java:115)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement (GroupAlsoByWindowFnRunner.java:73)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement (GroupAlsoByWindowsParDoFn.java:114)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process (ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:201)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at com.abc.common.batch.functions.AbcListFlattenFn.splitRestriction (AbcListFlattenFn.java:68)
下面给出本地 DirectRunner 和 Cloud DataFlow runner 之间的数据差异。
本地 DirectRunner:
- 它在样本输入 PCollection 元素中有 5000 个 abcs
云中的 DataflowRunner:
- 在 600 个输入 PCollection 元素中有不同大小的 abcs
- 很少有输入元素有 50000 abcs 可以展平
public class AbcList implements Serializable {
private List<Abc> abcs;
private List<Xyz> xyzs;
}
public class AbcListFlattenFn extends DoFn<AbcList, KV<Abc, List<Xyz>> {
@ProcessElement
public void process(@Element AbcList input,
ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) {
try {
/* Below commented lines are without the Splittable DoFn
input.getAbcs().stream().forEach(abc -> {
context.output(KV.of(abc, input.getXyzs()));
}); */
for (long index = tracker.currentRestriction().getFrom(); tracker.tryClaim(index);
++index) {
context.output(KV.of(input.getAbcs().get(Math.toIntExact(index),input.getXyzs())));
}
} catch (Exception e) {
log.error("Flattening AbcList has failed ", e);
}
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(AbcList input) {
return new OffsetRange(0, input.getAbcs().size());
}
@SplitRestriction
public void splitRestriction(final AbcList input,
final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
List<OffsetRange> ranges =
range.split(input.getAbcs().size() > 5000 ? 5000
: input.getAbcs().size(), 2000);
for (final OffsetRange p : ranges) {
receiver.output(p);
}
}
@NewTracker
public OffsetRangeTracker newTracker(OffsetRange range) {
return new OffsetRangeTracker(range);
}
}
有人可以在这里提出 ListFlatten 函数有什么问题吗?splitRestriction 是否导致以下问题?如何解决此 Shuffle 密钥大小问题?
解决方案
随机键大小限制是由于原型大小。为了摆脱这个问题,你可能想在你的 SDF 之前添加一个 Reshuffle。Reshuffle 将帮助您进行第一轮分发。
推荐阅读
- c++ - c++ Image Processing (cropping image from the cell)
- algorithm - 用于网络连接检查的位图有什么替代方法?
- doctrine-orm - 如何使用 fos 用户包实现多用户支持
- parameters - Passing parameters from R Markdown to Latex
- functional-programming - Do pure functions have only one possible implementation?
- python - 类型错误:+ 不支持的操作数:python 中的“dict”和“str”
- c# - C#/.NET Most performant way to call a method dynamically
- python-3.x - Scheduling a cron job in python to run a python script every day at 10 am through APSCHEDULER
- spring - Spring Security OAuth 2: How to use access token in the javascript client
- php - 使用 Laravel Passport 仅在少数路由上访问自己的帐户