首页 > 解决方案 > 卡夫卡重新平衡问题

问题描述

我在使用 Kafka 时遇到了一些问题,我们通过 Kafka 从一个应用程序以 XML 的形式获取每个产品数据,然后我们使用 spark 流,处理 XML 并将数据插入配置单元。每个产品 XML 处理大约需要 30-40 秒,因此我们将消费者轮询设置为 60 秒,以便它可以在 60 秒后轮询下一条记录。我们将不得不处理 90k 条产品 XML 记录,并且此活动仅进行一次,然后在我的应用程序中更新任何产品,这些产品只会通过 Kafka 并摄取到数据湖中。

由于 topic 中有 14 个分区,消费者从分区中读取记录并完成工作,但是如果我们在消费者组中再添加一个实例/消费者,那么它开始失败并给出以下错误。

org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 session.timeout.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或通过使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。

我从这个错误中了解到,如果它花费大量时间来处理记录并且如果我们不长时间轮询,那么组协调员可能会假设消费者已经死亡并触发重新平衡活动。在这种情况下,重新平衡会被触发,因为我们有一段时间没有轮询或者出现其他问题并且当前分区被删除然后分配给其他消费者,所以在这种情况下,我们希望提交我们在分区所有权之前已经处理过的任何内容被带走,分区的新所有者应该从上次提交的偏移量开始使用它。

要提交特定的偏移量,我们可以继续提交中间偏移量,而不是最终提交当前偏移量,并且要知道何时触发重新平衡以便我们可以提交我们已经处理过的那些,我们首先向消费者添加了重新平衡监听器 API撤销分区,提交偏移量,然后将其分配给其他分区。

Kafka 重新平衡侦听器 API 允许我们指定消费者重新平衡侦听器类,它有两个方法 onPartitionsRevoked 和 onPartitionsAssigned。onPartitionsRevoked 方法就在它拿走分区之前,所以我们可以提交我们当前的偏移量。onPartitionsAssigned 方法在重新平衡侦听器完成之后并开始使用新分区中的记录之前。

当上线时,我们需要与 14 个消费者一起并行处理 90k 条记录,因此我们添加了 rebalance listener 类来实现这一点(代码附后)。使用重新平衡侦听器 API,当我们添加另一个消费者时,将触发重新平衡,因为我们有新的消费者并且将有新的分区要读取,然后 Kafka 将撤销第一个消费者的所有分区,因为组中的消费者列表被修改并且两个消费者都将获得分配的新分区,然后他们每个人都会获得相同数量的不同分区。

毕竟,同样的问题仍然存在。你们能不能帮我们解决这个问题。

消费者配置值:

metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window .factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [d.messagebroker.genmills.com:9092, d.messagebroker.genmills.com:9092] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 1 检查.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore .password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = PIM-Consumer retry.backoff.ms = 100 ssl。 secure.random.implementation = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager。algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 指标。 num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl。 keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latestwait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache。 kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null 安全性。协议 = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = 最新wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache。 kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null 安全性。协议 = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = 最新ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [ TLSv1.2、TLSv1.1、TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = 最新ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [ TLSv1.2、TLSv1.1、TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = 最新enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = 最新enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = 最新

我的代码:

      while (true) {
        val record = consumer.poll(60000)
        if (record != null) {
          println("record count" + record.count())
          var scalaRecords = record.asScala.toList
          if (scalaRecords.nonEmpty) {
            scalaRecords.foreach(record => {
              var messageValue = (record.value).toString
              println("start timestamp - > " + LocalDateTime.now().toString)
              try {
                if (messageValue.length >= 2 && (messageValue.charAt(0) == '"') && (messageValue.charAt(messageValue.length - 1) == '"')) {
                  var message = messageValue.substring(1, messageValue.length - 1);
                  xmltags_check(message)
                }
                else {
                  xmltags_check(messageValue)
                }
                rebalanceListner.addoffset(record.topic(), record.partition(), record.offset());
                consumer.commitSync(rebalanceListner.getCurrentoffsets());
              }
              catch {
                case e: Exception =>
                  logError(s"Failed to process Message{$messageValue} with error details{$e}")
              }
              println("end timestamp - > " + LocalDateTime.now().toString)
            }
            )
          }

        } else {
          logInfo(s"running step to data lake- in else no record found")
        }

      }
    } catch {
      case e: Exception =>
        logError(s"Failed to poll records from consumer{$e}")
    }

rebalancelistner :-


import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class RebalanceListner implements ConsumerRebalanceListener {
    private KafkaConsumer consumer;
    private Map<TopicPartition, OffsetAndMetadata> currentoffsets = new HashMap();

    public RebalanceListner(KafkaConsumer con){
        this.consumer=con;
    }
    public void addoffset(String topic, int partition, long offset ){
        currentoffsets.put(new TopicPartition(topic, partition),new OffsetAndMetadata(offset,"commit"));
    }
    public Map<TopicPartition, OffsetAndMetadata> getCurrentoffsets(){
        return currentoffsets;
    }
    public void onPartitionsAssigned(Collection<TopicPartition> partitions){
        System.out.println("Following Partition Assigned ..");
        for(TopicPartition  partition: partitions)
            System.out.println(partition.partition()+",");
    }
    public void onPartitionsRevoked(Collection<TopicPartition> partitions){
        System.out.println("Following Partition Revoked ..");
        for(TopicPartition  partition: partitions)
            System.out.println(partition.partition()+",");
        System.out.println("Following Partition Commited ..");
        for(TopicPartition tp: currentoffsets.keySet())
            System.out.println(tp.partition());
        consumer.commitSync(currentoffsets);
        currentoffsets.clear();
    }
}

标签: apache-kafkakafka-consumer-apikafka-interactive-queries

解决方案


推荐阅读