首页 > 解决方案 > Akka-stream-kafka 消费者组中增加消费者触发重新平衡,由于撤销分区而导致 CommitFailedException

问题描述

我认为这个问题与#539有关,但我不知道这是一个错误,还是用户应该自己处理。

所以我有一个消费者组,每当我增加该组中的消费者数量时,撤销分区就会导致以下错误:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:778)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:617)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:584)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1479)
    at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$commit(KafkaConsumerActor.scala:430)
    at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:210)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:142)
    at akka.actor.Timers.aroundReceive(Timers.scala:55)
    at akka.actor.Timers.aroundReceive$(Timers.scala:40)
    at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:142)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

当我减少消费者数量时,这不会发生。我的意思是到目前为止我还没有观察到这一点。我认为这是因为分区不会在缩减时撤销。剩下的消费者只是得到新的分区。

请注意,我做群组消息并提交批量事情。

这是我的代码的样子

val source = Consumer.committableSource(consumerSettings, subscription)
      .async
      .groupBy(Int.MaxValue, computeNamedGraph)
      .groupedWithin(conf.tripleStoreSettings.batchSize, conf.tripleStoreSettings.batchWindowSec seconds)
      .map(toUpdateStatements)
      .async
      .mergeSubstreams
      .map(toHttpRequest)
      .map(p => p.data -> p)
      .via(poolClientFlow)
      .async
      .map { case (respone, payload) => Payload(respone, payload.offsets) }
      .mapConcat(handleResponse)
      .via(Committer.flow(committerDefaults.withMaxBatch(conf.tripleStoreSettings.batchSize)))

    val (killSwitch, streamResults) = source
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.both)
      .run()

    streamResults.onComplete {
      case Success(_) =>
        logger.info("Stream finished")
        system.terminate()
      case Failure(e) =>
        logger.error("Stream failed:", e)
        system.terminate()
    }

我的决策者只是执行以下操作:

 private val decider: Supervision.Decider = {
    e => {
      logger.error(s"Stream failed. ${e.getMessage} ${e.getStackTrace.map(_.toString).mkString("\n")}", e)
      Supervision.Stop
    }
  }

因此,根据我对 #539 的阅读,我了解到我有许多机上消息要提交,但由于撤销,我不能提交。也就是说,当消费者数量增加时,会发生一些涉及撤销的再平衡。

我的服务至少一次,所以我不介意其他消费者是否重新处理这些消息。我们没有最多一个交付约束。

我的问题是,直到图书馆本机处理这些情况,每当撤销发生或更好时,我怎么能继续提交它们,只是丢弃它们,所以被分配了它们所属分区的消费者将重新处理它们。

有什么建议吗?我检查了 BalanceListener,但我不确定如何在这种情况下使用它。

注意我的超时配置

val subscription = Subscriptions.topicPattern(conf.kafkaConsumer.sourceTopic)
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(conf.kafkaBroker.bootstrapServers)
      .withGroupId(conf.kafkaConsumer.groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, conf.kafkaConsumer.offsetReset)
      .withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000000")
      .withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100000")

标签: apache-kafkakafka-consumer-apiakka-streamalpakka

解决方案


推荐阅读