首页 > 解决方案 > 从 Spring Boot 应用程序调用 ConsumConsumerSeekCallback 的 seek

问题描述

这是我的设置:

ConsumerSeekAware 实现:

public class ReplayJobKafkaConsumer implements ConsumerSeekAware, AcknowledgingMessageListener<String, String> {

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

    }

    private static final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    private static ConsumerSeekCallback consumerSeekCallback;;

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.seekCallBack.set(callback);
        consumerSeekCallback = callback;
    }

    public void onMessage(final ConsumerRecord<String, String> data, final Acknowledgment acknowledgment) {
    }

    public static ThreadLocal<ConsumerSeekCallback> getSeekCallback(){
        return seekCallBack;
    }

    public static ConsumerSeekCallback getAnotherSeekCallback(){
        return consumerSeekCallback;
    }
}

我的 Spring Boot 应用程序近似于:

@SpringBootApplication
public class ReplayJobApplication{
...
public void run(final String... args){
        context = SpringApplication.run(ReplayJobApplication.class, args);
        ReplayJobKafkaConsumer.getAnotherSeekCallback().seek("top", 0, 23);
    }
...}

上述设置有效。现在我可以使用

java -jar -Dstart.offset=0....

但它只有在 seekcallback 变量不是 ThreadLocal 时才有效。我需要在 Spring Boot 应用程序中可以访问它,因为这就是我打算运行此使用者的方式。TEMP-TOPIC 的其他消费者仍然可以处理,但我打算根据需要运行此消费者,并带有开始和结束偏移量。虽然可以在消费者中读取命令行参数,但我担心的是

  1. 回调变量是静态的(我不可能创建 ReplayJobKafkaConsumer 的实例

  2. 它是一个普通变量而不是 ThreadLocal

虽然这个容器的生命周期只是从头到尾,我想知道这个设置是否有缺陷,需要一些确认这个实现是好的。

标签: spring-kafka

解决方案


你似乎对正在发生的事情有一些基本的误解。

  1. 之所以ThreadLocal需要,是因为 Kafka 消费者对象不是线程安全的。如果将回调存储在 ThreadLocal 中,则可以在运行时执行任意查找操作 - 可以从方法中执行,也可以在没有消息时onMessage侦听。ListenerContainerIdleEvent

  2. 您不能ReplayJobKafkaConsumer.getAnotherSeekCallback().seek("top", 0, 23);从另一个线程执行任意搜索。

  3. 在分配分区之前,您不能执行任意搜索。

因此,正如我在其他答案/评论中告诉您的那样,您必须在分配分区时进行搜索。

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
    // Do the seeks here using the `consumerSeekCallback` parameter.
}

ConsumerSeekAware使用现代版本的 spring-kafka,除非您想在运行时执行任意搜索(在初始搜索之后),否则您不需要使用。您可以使用 aConsumerAwareRebalanceListener代替。


推荐阅读