spring-boot - Spring Kafka在侦听器中按条件丢弃消息
问题描述
在我的 Spring Boot/Kafka 项目中,我有以下监听器:
@KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {
// do some logic
ack.acknowledge();
}
在侦听器内部,我需要根据我的业务逻辑检查某些条件,如果不满足 - 跳过处理此特定消息并让 Kafka 知道再次重新传递此消息。
我需要这个的原因 - 根据我的应用程序的业务逻辑,我需要避免每秒向特定的 Telegram 聊天发送超过一个帖子。这就是为什么我想检查 Kafka 监听器中的 chatLastSent 时间并在需要时推迟消息发送(通过消息重新传递到这个 Kafka 主题)
如何正确地做到这一点?我只需要ack.acknowledge();
这次不执行还是有另一种更合适的方法来实现它?
解决方案
当您抛出异常时,容器将调用错误处理程序,该处理程序将重新查找未处理的消息,以便在下一次轮询时再次获取它们。
推荐阅读
- swift - 如何为 UICollectionViewCompositionalLayout 中的不同部分设置多个背景视图
- visual-studio-code - VSCode“显示修复”的语言与文本编辑器本身不同
- mysql - 在 MySQL 中检测返回的空集,然后打印一条消息
- c++ - C ++忽略字符串中的\ x,如何使其成为纯文本?
- android - 谷歌播放控制台中的孤立采购订单
- json - Discord API:获取应用程序名称
- asp.net-core - 如何从 Rest API 中过滤掉堆栈跟踪信息并简单地返回 'Status Code: 500; 内部服务器错误'?
- r - Dplyr 变异,列输出数量可变
- vim - 如何保存和同步 vim 寄存器(宏)
- node.js - 从声明文件中创建类自动间返回类型