apache-beam - PubsubIO , msg 超过最大大小,如何执行错误处理
问题描述
我们在 GCP Dataflow 中运行管道,并遇到 pubsub 消息的最大消息大小 [1] 当这种情况发生时,管道延迟时间将开始增加,最终停止运行......
此日志消息是在“dataflow_step”下的 GCP 堆栈驱动程序中生成的,
我的问题,有没有办法在管道中定义错误处理......
.apply(PubsubIO.writeMessages()
.to("topic")
.withTimestampAttribute(Instant.now().toString()));
有类似的东西
.onError(...perform error handling ...)
以与 Java8 流 api 类似的流畅方式。这将允许管道继续使用 pubsub 限制内的输出。
处理这种情况的其他解决方案是最受欢迎的。
谢谢你,克里斯托夫·布希耶
[1] 由于验证错误而无法提交请求:generic::invalid_argument: Pubsub 发布请求限制为 10MB,拒绝超过 7MB 的消息以避免超出 byte64 请求编码的限制。
解决方案
对于 Dataflow 上的 PubsubIO 的特殊情况,请注意 Dataflow 会覆盖 PubsubIO 并处理对 Pubsub 的读取和写入消息,作为其流式实现的一部分。由于这种替换,我已经看到您正在讨论的相同错误出现在“shuffler”而不是“worker”下的日志中。
我通过在 PubsubIO.write() 步骤之前实现自定义转换来解决同样的问题。此 LimitPayloadSize 转换仅检查 PubsubMessage 中有多少字节,并且只允许通过有效负载小于 7 MB 的消息。
目前还没有一个流畅的 API 用于转换中的错误处理,尽管这已经讨论过了。目前,公认的模式是定义具有多个输出集合的转换,然后将失败消息的集合写入其他地方(例如通过 FileIO 的 GCS)。您可以将其实现为裸 DoFn,或者您可以查看 Partition:
PCollectionList<PubsubMessage> limitedPayloads = input
.apply("Limit payload size",
Partition
.of(2, new PartitionFn<PubsubMessage>() {
public int partitionFor(PubsubMessage message, int numPartitions) {
return message.getPayload().size < 7 * 1000 * 1000 ? 0 : 1;
}
}));
limitedPayloads.get(0).apply(PubsubIO.write()...);
limitedPayloads.get(1).apply(FileIO.write()...);
推荐阅读
- java - 如何创建端点以下载 PDF 并与服务器发送的事件共享生成进度
- android - 如何从自定义视图访问主机片段的生命周期范围?
- php - 多个 setcookie() 调用产生 502 错误
- python - 如何将函数应用于数组中的每个 3d 矩阵?
- ios - Firebase 是否仍将版本更新计为 first_open 事件?
- c# - 使用 SSL 到 KDPW 设置 .Net IBM.XMS 客户端
- spring - 使用 resteasy 3.0.8 和 spring boot 1.2.6 进行分段上传
- google-analytics - 如何在 Google Analytics(分析)> 行为 > 事件中按用户 ID 排除数据?
- python - 标签按钮在纹理之外是可点击的
- css - 将 wordpress 网站从 2 列更改为只有 1 列