apache-kafka - 重试消费来自 Kafka 主题的消息
问题描述
我正在开发一个模块,它使用来自 Kafka 主题的消息并发布到下游系统。如果下游系统不可用,消费者不确认 Kakfa 消息。因此,当我的消费者在下游系统不可用时收到消息时,不会提交 kakfa 的偏移量。但是,如果我在下游系统启动后收到新消息并且当我确认该消息时,将提交最新的偏移量,并且消费者永远不会收到主题中没有偏移量提交的那些消息。
即假设我的消费者被消耗到偏移量4。当下游不可用时,消费者会收到两条消息,因此我的消费者没有提交偏移量。所以 toipc 中的消息数现在是 6,但偏移量仍然是 4。现在下游系统可用,消费者收到一条新消息(第 7 条消息)。由于下游没有问题,消费者确认第 7 条消息,主题的偏移量将设置为 7。
有什么方法可以让我的消费者在收到第 7 条消息之前收到第 5 条和第 6 条消息?我在实现中使用了spring cloud stream。
解决方案
看到这个答案。
您需要 aSeekToCurrentErrorHandler
并引发异常,以便重置偏移量。
推荐阅读
- reactjs - React - 警告:列表中的每个孩子都应该有一个唯一的“关键”道具
- java - 获取本地日期之间的日历周差
- kubernetes - 我可以为每个用户/公司拥有一个 K8s pod吗?
- javascript - 是否可以异步捕获画布图像以确保 WebGL 画布在交换期间当前不是空白/黑色?
- javascript - 我如何使用 fetch() 获取的 json 数据
- python - numpy 将 2D 矩阵重塑为对称矩阵数组(3D 数组),无需循环
- rundeck - 项目中 Rundeck 组的环境变量
- python - 根据其他列值生成列
- python - 熊猫,.agg('sum') 与 .sum()
- eclipse - Eclipse CDT 4.11.0 因未找到 SWT 库错误而崩溃