首页 > 解决方案 > 提交分配给 Kafka 使用者的分区时出错。CommitFailedException:提交无法完成,因为组已经重新平衡

问题描述

使用 KafkaConsumer.commitSync() 提交时出现以下错误。使用 KafkConsumer.assign() 分配分区并进行轮询。轮询完成后,尝试提交时,会抛出以下错误。我也尝试过 AutoCommit,但没有运气。

仅当另一组消费者在同一消费者组中运行另一个进程时才会发生此问题,该消费者组基于模式订阅主题。但是,该模式与引发错误的消费者订阅的分区不匹配。一旦我停止该过程,就不会引发以下错误。

错误:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1256) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1163) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1164) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1139) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1005) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1389) ~[ext-lib-1.0.0.jar:1.3]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1343) ~[ext-lib-1.0.0.jar:1.3]

标签: javaapache-kafkakafka-consumer-api

解决方案


推荐阅读