首页 > 解决方案 > 模拟 Kafka CommitFailedException

问题描述

我正在尝试模拟 Kafka 抛出的 CommitFailedException。

我手动将“session.timeout.ms”设置为 10000 ms,将“enable.auto.commit”设置为 false。

在 Kafkaconsumer.poll() 之后,我得到了语句 Thread.sleep(12000),之后我进行了提交。我希望由于线程在下一次轮询之前需要 12 秒,因此应该将使用者标记为已死亡并且应该抛出 CommitFailedException。但是,该过程执行顺利。

我如何模拟 KafkaConsumer 抛出的异常。

consumer.subscribe(Arrays.asList("foo"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }

            try {
                Thread.sleep(12000);
            }catch (Exception e){
                e.printStackTrace();
            }
            consumer.commitSync();
        }

标签: apache-kafkakafka-consumer-api

解决方案


Kafka 通过单独的线程使用心跳机制来检查消费者的健康状况。session.timeout.ms消费者心跳线程必须在时间到期之前向代理发送心跳。

heartbeat.interval.ms:使用 Kafka 的组管理工具时,消费者协调器的心跳之间的预期时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。

session.timeout.ms:使用 Kafka 的组管理工具时用于检测客户端故障的超时时间。客户端定期发送心跳以向代理指示其活跃性。如果在此会话超时到期之前代理未收到任何心跳,则代理将从组中删除此客户端并启动重新平衡。

另一种检查消费者活跃度的机制是轮询。消费者应该 poll() 而不会过期max.poll.interval.ms。如果这个时间到期(通常长时间运行的进程会导致这个问题),消费者再次被认为是死的。

max.poll.interval.ms:使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。

session.timeout.ms如果消费者因为没有心跳或没有民意调查而被 Kafka 认为已死亡,则max.poll.interval.ms消费者无法提交消息并获取CommitFailedException

CommitFailedException:当使用 KafkaConsumer.commitSync() 的偏移提交失败并出现不可恢复的错误时,会引发此异常。当组重新平衡在成功应用提交之前完成时,可能会发生这种情况。在这种情况下,通常无法重试提交,因为某些分区可能已经分配给组中的另一个成员。

因此; 因为心跳线程是一个单独的线程,所以代码中的睡眠不会影响它。但在您的情况下,您可以设置max.poll.interval.ms为 10 秒来获取CommitFailedException.


推荐阅读