首页 > 解决方案 > Splittable DoFn 导致 Shuffle key 太大的问题

问题描述

我正在尝试实现一个ListFlatten功能,我已经使用SimpleDoFn它实现了它,它工作正常,但用于并行化。我正在将该函数转换为可拆分的 Do 函数。我设法在本地运行一个单元测试,使用 5000 个元素DirectRunner,同时在 DataFlow 中运行相同的元素,它失败并出现以下错误。

Error Details: 
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output (GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue (GroupAlsoByWindowFnRunner.java:102)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:126)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:54)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement (GroupAlsoByWindowFnRunner.java:115)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement (GroupAlsoByWindowFnRunner.java:73)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement (GroupAlsoByWindowsParDoFn.java:114)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process (ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:201)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at com.abc.common.batch.functions.AbcListFlattenFn.splitRestriction (AbcListFlattenFn.java:68)

下面给出本地 DirectRunner 和 Cloud DataFlow runner 之间的数据差异。

本地 DirectRunner:

  1. 它在样本输入 PCollection 元素中有 5000 个 abcs

云中的 DataflowRunner:

  1. 在 600 个输入 PCollection 元素中有不同大小的 abcs
  2. 很少有输入元素有 50000 abcs 可以展平
   public class AbcList implements Serializable {
        private List<Abc> abcs;
        private List<Xyz> xyzs;
   }

        public class AbcListFlattenFn extends DoFn<AbcList, KV<Abc, List<Xyz>> {

            @ProcessElement
            public void process(@Element AbcList input,
                ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) {

                try {
            /* Below commented lines are without the Splittable DoFn
                       input.getAbcs().stream().forEach(abc -> {
                                context.output(KV.of(abc, input.getXyzs()));
                         }); */

                    for (long index = tracker.currentRestriction().getFrom(); tracker.tryClaim(index);
                        ++index) {
                        context.output(KV.of(input.getAbcs().get(Math.toIntExact(index),input.getXyzs())));
                    }
                } catch (Exception e) {
                    log.error("Flattening AbcList has failed ", e);
                }

            }

            @GetInitialRestriction
            public OffsetRange getInitialRestriction(AbcList input) {
                return new OffsetRange(0, input.getAbcs().size());
            }

            @SplitRestriction
            public void splitRestriction(final AbcList input,
                final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
              List<OffsetRange> ranges =
                  range.split(input.getAbcs().size() > 5000 ? 5000
                        : input.getAbcs().size(), 2000);
                for (final OffsetRange p : ranges) {
                    receiver.output(p);
                }
            }

            @NewTracker
            public OffsetRangeTracker newTracker(OffsetRange range) {
                return new OffsetRangeTracker(range);
            }
        }

有人可以在这里提出 ListFlatten 函数有什么问题吗?splitRestriction 是否导致以下问题?如何解决此 Shuffle 密钥大小问题?

标签: google-cloud-dataflowapache-beamapache-beam-io

解决方案


随机键大小限制是由于原型大小。为了摆脱这个问题,你可能想在你的 SDF 之前添加一个 Reshuffle。Reshuffle 将帮助您进行第一轮分发。


推荐阅读