apache-kafka - org.apache.kafka.clients.consumer.RetriableCommitFailedException:偏移提交失败,出现可重试异常
问题描述
我在从 kafka 消费小批量并使用 commitAsync 时遇到了这个异常
couldn't ack 17 messages
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException
看起来 __consumer_offset 主题无法在 5 秒内复制(默认为 offsets.commit.timeout.ms)。
在我向 kafka 提交较大批次的同一应用程序的其他消费者中,我看不到此错误
config.put("client.id", InetAddress.getLocalHost().getHostAddress() + "_" + clientId + "_" + Thread.currentThread());
config.put("group.id", "some-id");
config.put("bootstrap.servers", clusterUrl);
config.put("auto.offset.reset", "latest");
config.put("heartbeat.interval.ms", 3000);
config.put("session.timeout.ms", 60000);
config.put("request.timeout.ms", 60000 + 5000);
config.put("enable.auto.commit", "false");
config.put("key.deserializer", StringDeserializer.class.getName());
config.put("value.deserializer", StringDeserializer.class.getName());
config.put("fetch.min.bytes", 1000000);
config.put("max.partition.fetch.bytes", 1000000);
config.put("fetch.max.wait.ms", 50);
什么会导致这种情况?
解决方案
这就是kafka connect的概念。当我们得到可重试异常时,消费者提交将不会发生,并且将再次重试同一批次。
默认重试 10 次,尝试间隔为 3 秒。
推荐阅读
- python-3.x - 计算列表中元素的频率并加入它们
- python - 从文本文件读取时无法正确编码字符串(编码为 sha256...)
- java - 如何实现对象特定的方法?
- google-sheets - 在 Google 表格中跨多个工作表程序引用数据
- elasticsearch - ping 期间发现的主节点不足 - Elasticsearch 集群自签名证书
- android - 哪种方法更适合创建 Android 别名资源?
- c# - 使用位置运算符时自定义反序列化器失败
- jinja2 - 如何在 BashOperator 中执行 python 代码和气流宏?
- php - imap_open 很长的延迟
- javascript - 使消息返回所有数据库行