spring - 重新启动侦听器并从最新消息继续
问题描述
案子
- 客户是
ReplyingKafkaTemplate
实例。 - 服务器是在方法上
ConcurrentMessageListenerContainer
使用@KafkaListener
和@SendTo
注释创建的。 - ContainerFactory 使用
ContainerStoppingErrorHandler
. - 请求主题只有 1 个分区。
- 组 ID 是静态的。例如。测试消费者群体。
- 请求发送超时。
- 由于抛出异常,服务器关闭,但客户端继续调度在请求主题上排队的请求。
当前行为
当服务器重新启动时,它会继续处理可能会超时的旧请求。
期望的行为
相反,最好继续最后一条消息;从而跳过甚至未处理的消息,因为相应的请求会超时并重试。
问题
- 实现这一目标的推荐方法是什么?
- 据我了解,看来我必须手动设置初始偏移量。实现这一点的最简单方法是什么?
解决方案
你的@KafkaListener
班级必须extends AbstractConsumerSeekAware
并且做这样的事情:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
super.onPartitionsAssigned(assignments, callback);
callback.seekToEnd(assignments.keySet());
}
因此,每次当您的消费者加入组时,它都会寻找所有分配的分区,直到最后跳过所有旧记录。
推荐阅读
- android - Kotlin 中是否有条件 try 语句,就像在 Swift 中一样?
- javascript - 触摸屏不支持点击事件
- android - 房间何时从数据库中删除实体?
- r - vapply 和矩阵;返校问题
- python - 循环连接两个文件以创建新文件?
- php - 如何打印表格的所有结果而不需要逐个变量打印?
- html - 如何在 Expressjs 中使用 GET 方法
- javascript - 带有加载按钮的网页抓取动态网页
- python-3.x - 通过使用 YAML、python、html、css,我们不能在不使用 NLP 的情况下创建标准聊天机器人吗?
- python - 继承视图的问题 - Odoo [v13] 模块