首页 > 解决方案 > 如何在不手动分配分区的情况下实现 Exactly-Once Kafka Consumer

问题描述

我正在阅读这篇文章,该文章解释了如何通过执行以下操作确保消息只被处理一次:

如您所见,它明确指定从哪个分区读取消息。我觉得这不是一个好主意,因为它不允许 Kafka 将公平份额的分区分配给活跃的消费者。在消费者内部轮询 kafka 主题时,如果没有明确指定分区,我无法提供实现类似功能的逻辑。有可能吗?

标签: apache-kafkakafka-consumer-api

解决方案


好分析。你有一个很好的观点,如果可能的话,你当然应该让 kafka 处理分配给消费者的分区。

consumer.Assign(Partition[]) 有一个替代方案。当分区被撤销或分配给消费者时,kafka 代理将通知您的消费者。例如,dotnet 客户端库有一个“SetPartitionsRevoked”和“SetPartitionsAssigned”处理程序,消费者可以使用它们来管理他们的偏移量。

撤销分区时,将要撤销的每个分区的最后处理偏移量保存到数据库。分配新分区时,从数据库中获取该分区的最后处理偏移量并使用它。

C# 示例:

public class Program
{
   public void Main(string[] args)
   {

      using (
         var consumer = new ConsumerBuilder<string, string>(config)
                      .SetErrorHandler(ErrorHandler)
                      .SetPartitionsRevokedHandler(HandlePartitionsRevoked)
                      .SetPartitionsAssigned(HandlePartitionsAssigned)
                      .Build()
      )
      {
         while (true)
         {
            consumer.Consume()//.Poll()
         }
      }
   }

   public IEnumerable<TopicPartitionOffset> 
   HandlePartitionsRevoked
   (
      IConsumer<string, string> consumer, 
      List<TopicPartitionOffset> currentTopicPartitionOffsets
   )
   {
      Persist(<last processed offset for each partition in 
      'currentTopicPartitionOffsets'>);
      return tpos;
   }

   public IEnumerable<TopicPartitionOffset> HandlePartitionsAssigned
   (
      IConsumer<string, string> consumer, 
      List<TopicPartition> tps
    )
   {
      List<TopicPartitionOffset> tpos = FetchOffsetsFromDbForTopicPartitions(tps);
      return tpos
   }
}

ConsumerRebalanceListener Docs中的 Java 示例:

如果用 Java 编写,您可以实现一个“ConsumerRebalanceListener”接口。然后,您将接口的实现传递给 consumer.Subscribe(topic, listener) 方法。下面的示例逐字取自上面链接的 kafka 文档:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
       private Consumer<?,?> consumer;

       public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
           this.consumer = consumer;
       }

       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
           // save the offsets in an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              saveOffsetInExternalStore(consumer.position(partition));
       }

       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
           // read the offsets from an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              consumer.seek(partition, readOffsetFromExternalStore(partition));
    }
}

如果我的理解是正确的,你会这样调用 java 版本:consumer.Subscribe("My topic", new SaveOffsetsOnRebalance(consumer)).

有关更多信息,请参阅kafka 文档的“在 Kafka 之外存储偏移量”部分。

以下是这些文档的摘录,总结了如何存储分区和偏移量以进行精确一次处理:

每条记录都有自己的偏移量,因此要管理自己的偏移量,您只需执行以下操作:

  • 配置 enable.auto.commit=false
  • 使用每个 ConsumerRecord 提供的偏移量来保存您的位置。
  • 重启时使用 seek(TopicPartition, long) 恢复消费者的位置。

当分区分配也是手动完成时,这种类型的使用是最简单的(这很可能在上面描述的搜索索引用例中)。如果分区分配是自动完成的,则需要特别注意处理分区分配发生变化的情况。这可以通过在调用 subscribe(Collection, ConsumerRebalanceListener) 和 subscribe(Pattern, ConsumerRebalanceListener) 时提供一个 ConsumerRebalanceListener 实例来完成。例如,当从消费者那里获取分区时,消费者将希望通过实现 ConsumerRebalanceListener.onPartitionsRevoked(Collection) 来为这些分区提交偏移量。当分区分配给消费者时,

ConsumerRebalanceListener 的另一个常见用途是刷新应用程序为移动到其他地方的分区维护的任何缓存。


推荐阅读