java - 重新消费未提交偏移量的消息
问题描述
我有一个自定义的 Kafka 消费者,用于向 REST API 发送一些请求。根据 API 的响应,我要么提交偏移量,要么在不提交的情况下跳过消息。
最小的例子:
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {
// Sending a POST request and retrieving the answer
// ...
if (responseCode.startsWith("2")) {
try {
consumer.commitSync();
} catch(CommitFailedException ex) {
ex.printStackTrace();
}
} else {
// Do Nothing
}
}
}
现在,当来自 REST API 的响应不以 a 开头时,2
不会提交偏移量,但不会重新使用消息。如何强制消费者重新使用未提交偏移量的消息?
解决方案
提交偏移量只是存储消费者当前偏移量(也称为位置)的一种方式。因此,如果它停止,它(或接管的新消费者实例)可以找到它以前的位置并从那里重新开始消费。
因此,即使您不提交,一旦您收到记录,消费者的位置也会移动。如果你想重新消费一些记录,你必须改变消费者的当前位置。
使用 Java 客户端,您可以使用seek()
.
在您的场景中,您可能想要计算相对于当前位置的新位置。如果是这样,您可以使用 找到当前位置position()
。
推荐阅读
- angular - 是否有一些简单的方法可以使 ui5 webcomponent(例如 ui5-input)与角度组件模型保持同步?
- docker-compose - 构建 docker-compose 镜像时使用 Alpine 的本地镜像,而不是遗留存储库
- mysql - JPA使复合外键的一部分成为主键的一部分
- python - 读取嵌入在 pptx 图表中的数据
- python - 将训练/测试拆分为确切的行数
- python - Gekko:计算一个变量,它是先前和当前时间步的值的函数
- scala - Scala,有趣的隐式拼图
- spring-boot - JSON deserializer conflict with Spring Boot Starter Web and Spring Cloud Stream
- javascript - Javascript 检查年份范围是否与其他范围冲突
- python - 分组并爆炸pyspark数组类型列