首页 > 解决方案 > 重新启动侦听器并从最新消息继续

问题描述

案子

  1. 客户是ReplyingKafkaTemplate实例。
  2. 服务器是在方法上ConcurrentMessageListenerContainer使用@KafkaListener@SendTo注释创建的。
  3. ContainerFactory 使用ContainerStoppingErrorHandler.
  4. 请求主题只有 1 个分区。
  5. 组 ID 是静态的。例如。测试消费者群体。
  6. 请求发送超时。
  7. 由于抛出异常,服务器关闭,但客户端继续调度在请求主题上排队的请求。

当前行为

当服务器重新启动时,它会继续处理可能会超时的旧请求。

期望的行为

相反,最好继续最后一条消息;从而跳过甚至未处理的消息,因为相应的请求会超时并重试。

问题

  1. 实现这一目标的推荐方法是什么?
  2. 据我了解,看来我必须手动设置初始偏移量。实现这一点的最简单方法是什么?

标签: springapache-kafkaspring-kafka

解决方案


你的@KafkaListener班级必须extends AbstractConsumerSeekAware并且做这样的事情:

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        super.onPartitionsAssigned(assignments, callback);
        callback.seekToEnd(assignments.keySet());
    }

因此,每次当您的消费者加入组时,它都会寻找所有分配的分区,直到最后跳过所有旧记录。


推荐阅读