spring - Spring Kafka 再平衡说明
问题描述
我只是想了解 kafka rebalance。这是我的监听器方法。我已将 RetryTemplate 与消费者工厂配置为重试 20 次,回退延迟为 20 秒。我正在使用 spring-kafka 1.2.2(我们计划升级客户端)并使用手动确认。
@KafkaListener(id = "${kafka.listener-id}", topics = "${kafka.topic}")
public void listen(final ConsumerRecord<String, String> consumerRecord,
final Acknowledgment acknowledgment) throws ServiceResponseException {}
if (true){
System.out.println("throwing exception ");
throw new RuntimeException();
}
try {
acknowledgment.acknowledge();
LOGGER.info("Kafka acknowledgment sent for Transaction ID:");
} catch (Exception e) {
LOGGER.info("Exception encountered when acking record with transaction id: {}");
}
}
我有 2 个工人,每个工人有 2 个并发。在 kafka 上,我有 3 个分区。我启动了一个工人,并将 3 个分区分配给工人 1。然后我发了一条信息。并且在侦听器中抛出了一个 RuntimeException,这种情况发生了 20 次,延迟时间为 20 秒。然后,当我启动 worker2 kafka 重新平衡触发器但尚未分配分区时。worker1 失败并显示消息“处理时出错:ConsumerRecord”(在 getContainerProperties().getShutdownTimeout() 之后),然后所有消费者都加入了该组。现在同样的消息被传递给worker2。
1)这正在工作,因为我需要它来工作。但是我有一个问题,当重新平衡触发为什么分区分配没有立即发生时,它会等待 worker1 完全停止(等待 getContainerProperties().getShutdownTimeout()),然后来自 worker1 和 worker2 的所有消费者加入组。
2)在重新平衡期间,我观察到消费者停止调用 poll(来自下面的日志)。这是真的吗?
来自工作人员 1 的跟踪日志:
2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-2-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.384 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.384 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:53.977 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.977 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.008 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:54.008 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.023 INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [messages-0] for group mris-group
2018-09-23 13:52:54.023 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.023 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.081 INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [messages-1] for group mris-group
2018-09-23 13:52:54.081 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.081 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.241 INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [messages-2] for group mris-group
2018-09-23 13:52:54.241 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.241 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264 INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[messages-2]
2018-09-23 13:52:54.264 INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[messages-0]
2018-09-23 13:52:54.264 INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group mris-group
2018-09-23 13:52:54.265 INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group mris-group
2018-09-23 13:53:09.355 DEBUG 6384 --- [ listener-1-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Interrupting invoker
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:53:24.085 INFO 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker timed out while waiting for shutdown and will be canceled.
2018-09-23 13:53:24.085 INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[messages-1]
2018-09-23 13:53:24.085 INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group mris-group
2018-09-23 13:53:24.101 ERROR 6384 --- [ listener-1-L-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = test_hotfix1@test.com, value = <removed>])
org.springframework.retry.backoff.BackOffInterruptedException: Thread interrupted while sleeping; nested exception is java.lang.InterruptedException: sleep interrupted
at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:86) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.retry.backoff.StatelessBackOffPolicy.backOff(StatelessBackOffPolicy.java:36) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:305) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:179) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:55) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:34) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.2.2.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) [na:1.8.0_162]
at org.springframework.retry.backoff.ThreadWaitSleeper.sleep(ThreadWaitSleeper.java:30) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:83) ~[spring-retry-1.2.0.RELEASE.jar:na]
... 14 common frames omitted
2018-09-23 13:53:24.101 INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.101 INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102 INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102 INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [messages-0] for group mris-group
2018-09-23 13:53:24.102 INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [messages-2] for group mris-group
2018-09-23 13:53:24.102 INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.103 INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-0=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.104 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-2=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.106 INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[messages-0]
2018-09-23 13:53:24.107 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.107 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.108 INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[messages-2]
2018-09-23 13:53:24.108 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.108 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.207 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.207 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:25.111 TRACE 6384 --- [ listener-0-L-2] essageListenerContainer$ListenerConsumer : No records to process
来自 worker2 的跟踪日志:
2018-09-23 13:53:24.102 INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.104 INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.105 INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105 INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.105 INFO 6401 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2018-09-23 13:53:24.105 INFO 6401 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2018-09-23 13:53:24.106 INFO 6401 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [messages-1] for group mris-group
2018-09-23 13:53:24.111 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-1=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.115 INFO 6401 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[messages-1]
2018-09-23 13:53:24.118 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.118 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.189 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-09-23 13:53:24.189 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.202 TRACE 6401 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : Processing ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = test_hotfix1@test.com, value = <removed>)
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 TRACE 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.210 TRACE 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.216 DEBUG 6401 --- [ listener-0-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception
2018-09-23 13:53:25.194 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:25.194 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
解决方案
1.3 之前的版本有一个非常复杂的线程模型,以避免由于缓慢的侦听器而导致重新平衡。KIP-62 使我们能够在 1.3 及更高版本中使用更简单的线程模型。
1.2.x 不再受支持,我没有时间(或不愿意)回去弄清楚发生了什么。请升级到 1.3.7(甚至更好,2.1.10)。
推荐阅读
- c# - C#/Dapper - 将 DataTable 作为输入参数传递给存储过程
- vue.js - 具有大量对象的 Vue 性能
- javascript - async await 在嵌套时如何工作?
- google-apps-script - 获取所有数据范围
- json - 使用名为 self 的元素解码 JSON
- spring-boot - Spring 5 Security:'没有为 id null 映射的密码编码器'
- gitlab - GitLab CE 在哪里显示提交状态?
- python - TypeError:“int”对象在 np.random.seed 中不可调用
- python - Lustre 文件系统上有多个读取器的 SQLite“磁盘 I/O 错误”
- python - 如何计算 Python 中包括结尾新行在内的行数?