首页 > 解决方案 > 致谢 Kafka Producer Apache Beam

问题描述

如何获取在 apache Beam KafkaIO 中收到确认的记录?

基本上,我希望我没有得到任何确认的所有记录都转到一个 bigquery 表,以便我可以稍后重试。我使用了文档中的以下代码片段

    .apply(KafkaIO.<Long, String>read()
       .withBootstrapServers("broker_1:9092,broker_2:9092")
       .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
       .withKeyDeserializer(LongDeserializer.class)
       .withValueDeserializer(StringDeserializer.class)

       // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>

       // Rest of the settings are optional :

       // you can further customize KafkaConsumer used to read the records by adding more
       // settings for ConsumerConfig. e.g :
       .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))

       // set event times and watermark based on LogAppendTime. To provide a custom
       // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
       .withLogAppendTime()

       // restrict reader to committed messages on Kafka (see method documentation).
       .withReadCommitted()

       // offset consumed by the pipeline can be committed back.
       .commitOffsetsInFinalize()

       // finally, if you don't need Kafka metadata, you can drop it.g
       .withoutMetadata() // PCollection<KV<Long, String>>
    )
    .apply(Values.<String>create()) // PCollection<String>

标签: apache-kafkagoogle-cloud-dataflowapache-beamapache-beam-io

解决方案


默认情况下,Beam IO 旨在不断尝试写入/读取/处理元素,直到 . (重复错误后批处理管道会失败)

您所指的通常称为死信队列,用于获取失败的记录并将它们添加到 PCollection、Pubsub 主题、排队服务等。这通常是可取的,因为它允许流式管道取得进展(不是块),当遇到写入某些记录的错误时,但允许写入成功的一次。

不幸的是,除非我弄错了,否则 Kafka IO 中没有实现死信队列。可以修改 KafkaIO 来支持这一点。在 Beam 邮件列表上进行了一些讨论,提出了一些实现这一点的想法,这可能有一些想法

我怀疑可以将它添加到KafkaWriter中,捕获失败的记录并将它们输出到另一个 PCollection。如果您选择实现此功能,请同时联系 beam社区邮件列表,如果您需要帮助将其合并到 master 中,他们将能够帮助确保更改涵盖必要的要求,以便可以合并并作为一个有意义的整体为梁。

然后,您的管道可以将它们写入其他地方(即不同的来源)。当然,如果该辅助源同时出现中断/问题,您将需要另一个 DLQ。


推荐阅读