java - 在 spring kafka consumer 1.2.x 中寻求方法行为
问题描述
我不想为处理失败的那些消息提交偏移量,我希望它们再次被重新传递以进行处理。我正在使用 spring-kafka 1.2.x 并在我的侦听器中实现了ConsumerSeekAware 。
@Component
public class Listener implements ConsumerSeekAware {
private static Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@KafkaListener(topics = "my-topic", containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen1(ConsumerRecord<String, String> consumerRecord) throws MyCustomException {
logger.info("received: key - " + consumerRecord.key() + " value - " + consumerRecord.value());
// Below code is just to show the issue.Not acknowledging so I can get the same msg again.
boolean should_commit = false;
if(should_commit) {
ack.acknowledge();
}
else {
this.seekCallBack.get().seek(consumerRecord.topic(), consumerRecord.partition() , consumerRecord.offset());
}
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
logger.info("registerSeekCallback called..");
this.seekCallBack.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
logger.info("onPartitionsAssigned called..");
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
logger.info("onIdleContainer called..");
}
}
#########Container 配置(auto.commit 在消费者中为假)
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
我面临的问题是,如果我在一个主题的不同分区中有 10 条消息,那么我会一一获取所有消息,并且在获取所有消息后,我会继续获取任何分区的最后一条消息。我还尝试了 SeekToCurrentErrorHandler,它在 2.0.x 版本中实现并且运行良好。但我无法升级我的 kafka 版本。如果我重新启动容器,我会再次收到所有消息,这很好,但我不想在消息处理失败时停止容器。
所以我的问题是是否有可能获得与spring-kafka 1.2.x中的SeekToCurrentErrorHandler相同的行为(完全一样而无需停止容器) ?
解决方案
推荐阅读
- spring-boot - 如何修复 Spring Boot DataSource:未指定“url”属性
- terraform - terraform:ec2 机器更改不会触发 dns 更改
- julia - &-ing Julia 中的两个位数组?
- django - 如何从 Django 应用程序发出 GraphQL api 请求?
- amazon-web-services - 我们可以为 AWS API Gateway 执行日志创建自定义路径吗
- css - 无法停止使用 flexbox 拉伸元素
- python - 为什么 from tkinter import * 不导入 Tkinter 的消息框?
- javascript - 如何使用 Node JS http get 方法下载大量远程文件而不会遇到错误
- python - 熊猫将行移动到单列并重塑数据框
- hana - 在 SAP HANA 上使用案例时出错:SAP DBTech JDBC:[403]:内部错误:打开游标时出错