apache-kafka - 致谢 Kafka Producer Apache Beam
问题描述
如何获取在 apache Beam KafkaIO 中收到确认的记录?
基本上,我希望我没有得到任何确认的所有记录都转到一个 bigquery 表,以便我可以稍后重试。我使用了文档中的以下代码片段
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
.withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create()) // PCollection<String>
解决方案
默认情况下,Beam IO 旨在不断尝试写入/读取/处理元素,直到 . (重复错误后批处理管道会失败)
您所指的通常称为死信队列,用于获取失败的记录并将它们添加到 PCollection、Pubsub 主题、排队服务等。这通常是可取的,因为它允许流式管道取得进展(不是块),当遇到写入某些记录的错误时,但允许写入成功的一次。
不幸的是,除非我弄错了,否则 Kafka IO 中没有实现死信队列。可以修改 KafkaIO 来支持这一点。在 Beam 邮件列表上进行了一些讨论,提出了一些实现这一点的想法,这可能有一些想法。
我怀疑可以将它添加到KafkaWriter中,捕获失败的记录并将它们输出到另一个 PCollection。如果您选择实现此功能,请同时联系 beam社区邮件列表,如果您需要帮助将其合并到 master 中,他们将能够帮助确保更改涵盖必要的要求,以便可以合并并作为一个有意义的整体为梁。
然后,您的管道可以将它们写入其他地方(即不同的来源)。当然,如果该辅助源同时出现中断/问题,您将需要另一个 DLQ。
推荐阅读
- python - 单击特定 div 下的每个链接 - python selenium
- ios - 如何在 Swift 中为 NSObject 模型变量添加值?
- python-3.x - 使用机器人定期发送电报消息
- github - GitHub 回购转移
- algorithm - 使用 Dinic 的 O((V^2)E) 算法优于 Edmond-Karp 算法 O(V(E^2)) 的优势
- python - 虚拟环境中的 clj-python
- amazon-web-services - 创建 IAM 角色时出错。MalformedPolicyDocument:已禁止字段资源。地形
- c# - 我想使用 c# 通过 wifi 将字符串从 android 设备发送到 windows 机器。这可能吗?
- javascript - 如何使样式表仅可供一个组件 react.js 访问
- swift - UITextField 中的最大字符数以及仅限制数值