apache-kafka - 读取事务写入的最后一个 Kafka 分区记录
问题描述
我正在尝试读取主题分区的最后一条记录。该主题的生产者正在事务性地写作。消费者设置为isolation.level=read_committed
。我手动管理偏移量。这是我的消费者代码:
// Fetch the end offset of a partition
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singleton(topicPartition));
Long endOffset = endOffsets.get(topicPartition);
// Try to read the last record
if (endOffset > 0) {
consumer.seek(topicPartition, Math.max(0, endOffset - 5));
List records = consumer.poll(10 * 1000).records(topicPartition);
if (records.isEmpty()) {
throw new IllegalStateException("How can this be?");
} else {
// Return the last record
return records.get(records.size() - 1);
}
}
所以要读取最后一条记录,我要求结束偏移量,然后寻找endOffset - 5
(因为 Kafka 在完全一次模式下跳过偏移量,就像我在这个问题中看到的那样:Kafka Streams does not increase offset by 1 when production to topic) ,并开始轮询。
它以前一直运行良好,但现在我有一个异常告诉我轮询零记录。这可能是什么原因?我无法复制它,我想我迷路了。
解决方案
推荐阅读
- flutter - 来自 api 的表情符号无法正确显示
- javascript - 忽略 jshint 中特定变量的驼峰式检查
- wordpress - MVC 方式的 WordrPess 插件
- node.js - Node Cron 执行具体功能
- grails - 在 Grails 4 中使用 Micronaut HTTP 客户端进行参数编码
- mobile - 没有方法签名:静态 com.kms.katalon.core.testobject.ObjectRepository.findTestObject() 适用于参数类型:(java.lang.String,
- ios - iOS:是否可以以编程方式触发 LED 闪烁以发出警报?
- java - 如何在 Java Canvas 中绘制 JSON 文本?
- ios - 根据“if”语句发出警报
- python - 当函数在 Pandas 数据框中工作时,PySpark udf 返回 null