首页 > 解决方案 > Kafka:使用 Apache Beam 进行一次语义配置

问题描述

我正在尝试在 Kafka(Apache Beam)中配置一次语义。以下是我将要介绍的更改:

制片人:

  1. enable.idenpotence= 真

  2. transactional.id= uniqueTransactionalId

消费者:

  1. 设置enable.auto.commit=假

    // 将以下内容添加到消费者构建器:

  2. .commitOffsetsInFinalize()

  3. .withReadCommitted()

在构建器中添加了以下内容KafkaIO#write

  1. .withEOS(numShards, sinkGroupId)

有谁知道在 Apache Beam KafkaIO 中还应该改变什么来实现一次语义?

上面的配置看起来不错还是我误解了smth?

transactional.id如果我不使用事务 API(因为我在 apache Beam 中没有明确的生产者),是否需要指定属性?

标签: apache-kafkagoogle-cloud-dataflowapache-beamapache-beam-io

解决方案


好吧,看起来我终于找到了符合我要求的正确设置。这是我最终得到的结果:

1)KafkaIO.Read

  • 更新消费者属性enable.auto.commit = false
  • .withReadCommitted()
  • .commitOffsetsInFinalize()

2)KafkaIO#write

  • .withEOS(numShards, sinkGroupId)

    它还将启用幂等性并transactional.id为生产者设置幕后。

因此,通过这样的设置,我们将在读取时具有至少一次语义,在写入时具有精确一次语义。


推荐阅读