java - 如何在spring kafka Listener中找出是否是重试消息
问题描述
我们正在尝试在 spring boot kafka 侦听器中解决边缘情况:
@KafkaListener(topics="topicName", groupId="consumerGoupId")
public void consumeNotification(@Payload Datarequest request,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID)Integer partitionId)
{
processRequest(request);
}
现在由于某种原因,当 processRequest 正在进行时,我的 spring boot 应用程序会关闭,偏移量没有被提交,当它启动时,侦听器会再次接收相同的消息进行处理。
我的用例是:有没有办法知道侦听器在重试过程中收到了这条消息?
因为由于应用程序崩溃导致进程请求在两者之间停止,如果相同的消息再次被侦听器拾取,我必须编写一些逻辑processRequest
以从它停止的位置开始处理。
解决方案
正确的; 唯一的保证是至少一次交货;无法知道消息是否已重新传递(重新启动后 - 我们可以判断它是否是同一应用程序实例中的重新传递 - 请参阅https://docs.spring.io/spring-kafka/docs/current/reference /html/#delivery-header)。
由于您无法在所有情况下都知道,因此您必须使您的侦听器具有幂等性 - 例如,通过将主题/分区/偏移量与数据一起存储,以便您可以检查它是否已被处理。
推荐阅读
- sql-server - 如何使用 IdentityServer 4 和 SQL Server?
- javascript - 谷歌折线图只有完整的步骤/刻度
- html - 向水平滚动容器的第一个和最后一个元素添加边距
- sql - 为 RowsPerPage 和 PageNumber 创建 Helper
- matlab - 如何使用 HMM matlab Statistics and Machine Learning Toolbox 使用 hmm 进行分类?
- java - 参考值与原始值的区别
- python - 我必须比较学生是否超过 51 与 FOR
- html - CSS+Vueitfy 填充剩余高度并在需要时添加滚动条
- android - 为 Gson 数据类设置 Dynamic SerializedName 注解
- google-apps-script - 我如何以 Gdocs 中的表格为中心(使用 appendTable() 从 Gsheet 导入表格)