首页 > 解决方案 > 如何在 apache kafka 中处理异常和消息重新处理

问题描述

我有一个卡夫卡集群。只有一个主题,对于这个主题,3 个不同的消费者组从主题中获取相同的消息,并根据自己的逻辑进行不同的处理。

为多个消费者组创建相同的主题有什么问题吗?

我有这个疑问,因为我正在尝试实现异常主题并尝试重新处理此消息。假设,我在主题 A 中有消息“秘密”。我的所有 3 个消费者组都收到了消息“秘密”。我的 2 个消费组成功完成了消息的处理。但是对于我的一个消费组未能处理该消息。

所以我将消息保留在主题“failed_topic”中。我想尝试为失败的消费者处理此消息。但是,如果我将此消息保留在我的实际主题 A 中,则其他 2 个消费者组会第二次处理此消息。

有人可以告诉我如何为这种情况实施完美的再处理吗?

标签: apache-kafka

解决方案


首先,在 Kafka 中,每个消费者组对于订阅的每个主题分区都有自己的偏移量,这些偏移量由消费者组单独管理。因此,在一个消费者群体中失败不会影响其他消费者群体。

您可以使用以下 cli 命令检查使用者组的当前偏移量:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

为多个消费者组创建相同的主题有什么问题吗

不,没有问题。实际上,这是基于主题的发布者/订阅者模式的正常行为。


要实现重新处理逻辑,需要考虑一些要点:

  • 即使您正在重新处理相同的消息,您也应该继续调用 poll()。否则以后max.poll.interval.ms你的消费者将被视为死亡并被撤销。
  • 通过调用 poll() 您将收到您的消费者组尚未阅读的消息。因此,当您 poll() 时,您将 max.poll.records在再次 poll() 时收到消息,这次您将收到下一组消息。因此,要重新处理失败的消息,您需要调用 seek 方法。

public void seek(TopicPartition partition, long offset) :覆盖消费者将在下一次轮询时使用的获取偏移量(超时)

  • 理想情况下,消费者组中的消费者数量应等于订阅主题的分区数量。Kafka 会负责将分区均匀地分配给消费者。(每个消费者一个分区)但是即使一开始就满足这个条件,一段时间后消费者可能会死亡,Kafka可能会为一个消费者分配多个分区。这可能会导致一些问题。假设您的消费者负责两个分区,当您 poll() 时,您将从这两个分区获取消息,并且当无法使用消息时,您应该查找分配的所有分区(不仅仅是一个失败的消息来自)。否则,您可能会跳过一些消息。

让我们尝试使用这些信息编写一些伪代码来实现异常情况下的重新处理逻辑:

public void consumeLoop() {
    while (true) {
        currentRecord = consumer.poll(); //max.poll.records = 1
        if (currentRecord != null) {
            try {
                processMessage(currentRecord);
            } catch (Exception e) {
                consumer.seek(new TopicPartition(currentRecord.topic(), currentRecord.partition()), currentRecord.offset());
                continue;
            }
            consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(currentRecord.offset() + 1)));
        }
    }
}

关于代码的注释:

  • max.poll.records 设置为 1 以简化搜索过程。
  • 在每个异常中,我们都会寻求并轮询以再次获得相同的消息。(我们必须投票才能被卡夫卡认为是活着的)
  • auto.commit 被禁用

推荐阅读