apache-kafka - 模拟 Kafka CommitFailedException
问题描述
我正在尝试模拟 Kafka 抛出的 CommitFailedException。
我手动将“session.timeout.ms”设置为 10000 ms,将“enable.auto.commit”设置为 false。
在 Kafkaconsumer.poll() 之后,我得到了语句 Thread.sleep(12000),之后我进行了提交。我希望由于线程在下一次轮询之前需要 12 秒,因此应该将使用者标记为已死亡并且应该抛出 CommitFailedException。但是,该过程执行顺利。
我如何模拟 KafkaConsumer 抛出的异常。
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
try {
Thread.sleep(12000);
}catch (Exception e){
e.printStackTrace();
}
consumer.commitSync();
}
解决方案
Kafka 通过单独的线程使用心跳机制来检查消费者的健康状况。session.timeout.ms
消费者心跳线程必须在时间到期之前向代理发送心跳。
heartbeat.interval.ms:使用 Kafka 的组管理工具时,消费者协调器的心跳之间的预期时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。
session.timeout.ms:使用 Kafka 的组管理工具时用于检测客户端故障的超时时间。客户端定期发送心跳以向代理指示其活跃性。如果在此会话超时到期之前代理未收到任何心跳,则代理将从组中删除此客户端并启动重新平衡。
另一种检查消费者活跃度的机制是轮询。消费者应该 poll() 而不会过期max.poll.interval.ms
。如果这个时间到期(通常长时间运行的进程会导致这个问题),消费者再次被认为是死的。
max.poll.interval.ms:使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。
session.timeout.ms
如果消费者因为没有心跳或没有民意调查而被 Kafka 认为已死亡,则max.poll.interval.ms
消费者无法提交消息并获取CommitFailedException
。
CommitFailedException:当使用 KafkaConsumer.commitSync() 的偏移提交失败并出现不可恢复的错误时,会引发此异常。当组重新平衡在成功应用提交之前完成时,可能会发生这种情况。在这种情况下,通常无法重试提交,因为某些分区可能已经分配给组中的另一个成员。
因此; 因为心跳线程是一个单独的线程,所以代码中的睡眠不会影响它。但在您的情况下,您可以设置max.poll.interval.ms
为 10 秒来获取CommitFailedException
.
推荐阅读
- c# - 我应该在单元测试中使用依赖注入容器吗?
- dart - Flutter camera frame drops when switching between pages
- javascript - 如何检查 html 元素是否包含它自己的文本(包含 textNode)
- oracle - Function to query data import date when there is no "created at" column in Oracle
- firebase - Slow function execution from google iot core MQTT modifyCloudToDeviceConfig()
- .net - How to pass variable list in ViewBag
- java - Insert into Hive ORC table by nested POJO
- c# - 从 NOSQL 数据库中读出 Double 和 DateTime
- installshield - Evaluate if variable contains a value (installscript)
- javascript - Javascript - 将 1 转换为真(示例)