首页 > 解决方案 > PubsubIO , msg 超过最大大小,如何执行错误处理

问题描述

我们在 GCP Dataflow 中运行管道,并遇到 pubsub 消息的最大消息大小 [1] 当这种情况发生时,管道延迟时间将开始增加,最终停止运行......

此日志消息是在“dataflow_step”下的 GCP 堆栈驱动程序中生成的,

我的问题,有没有办法在管道中定义错误处理......

.apply(PubsubIO.writeMessages()
                        .to("topic")
                        .withTimestampAttribute(Instant.now().toString()));

有类似的东西

.onError(...perform error handling ...)

以与 Java8 流 api 类似的流畅方式。这将允许管道继续使用 pubsub 限制内的输出。

处理这种情况的其他解决方案是最受欢迎的。

谢谢你,克里斯托夫·布希耶

[1] 由于验证错误而无法提交请求:generic::invalid_argument: Pubsub 发布请求限制为 10MB,拒绝超过 7MB 的消息以避免超出 byte64 请求编码的限制。

标签: apache-beamgoogle-cloud-pubsubdataflow

解决方案


对于 Dataflow 上的 PubsubIO 的特殊情况,请注意 Dataflow 会覆盖 PubsubIO 并处理对 Pubsub 的读取和写入消息,作为其流式实现的一部分。由于这种替换,我已经看到您正在讨论的相同错误出现在“shuffler”而不是“worker”下的日志中。

我通过在 PubsubIO.write() 步骤之前实现自定义转换来解决同样的问题。此 LimitPayloadSize 转换仅检查 PubsubMessage 中有多少字节,并且只允许通过有效负载小于 7 MB 的消息。

目前还没有一个流畅的 API 用于转换中的错误处理,尽管这已经讨论过了。目前,公认的模式是定义具有多个输出集合的转换,然后将失败消息的集合写入其他地方(例如通过 FileIO 的 GCS)。您可以将其实现为裸 DoFn,或者您可以查看 Partition:

PCollectionList<PubsubMessage> limitedPayloads = input
        .apply("Limit payload size",
                Partition
                        .of(2, new PartitionFn<PubsubMessage>() {
  public int partitionFor(PubsubMessage message, int numPartitions) {
    return message.getPayload().size < 7 * 1000 * 1000 ? 0 : 1;
  }
}));
limitedPayloads.get(0).apply(PubsubIO.write()...);
limitedPayloads.get(1).apply(FileIO.write()...);

推荐阅读