spring-kafka - 从 Spring Boot 应用程序调用 ConsumConsumerSeekCallback 的 seek
问题描述
这是我的设置:
ConsumerSeekAware 实现:
public class ReplayJobKafkaConsumer implements ConsumerSeekAware, AcknowledgingMessageListener<String, String> {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
private static final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
private static ConsumerSeekCallback consumerSeekCallback;;
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.seekCallBack.set(callback);
consumerSeekCallback = callback;
}
public void onMessage(final ConsumerRecord<String, String> data, final Acknowledgment acknowledgment) {
}
public static ThreadLocal<ConsumerSeekCallback> getSeekCallback(){
return seekCallBack;
}
public static ConsumerSeekCallback getAnotherSeekCallback(){
return consumerSeekCallback;
}
}
我的 Spring Boot 应用程序近似于:
@SpringBootApplication
public class ReplayJobApplication{
...
public void run(final String... args){
context = SpringApplication.run(ReplayJobApplication.class, args);
ReplayJobKafkaConsumer.getAnotherSeekCallback().seek("top", 0, 23);
}
...}
上述设置有效。现在我可以使用
java -jar -Dstart.offset=0....
但它只有在 seekcallback 变量不是 ThreadLocal 时才有效。我需要在 Spring Boot 应用程序中可以访问它,因为这就是我打算运行此使用者的方式。TEMP-TOPIC 的其他消费者仍然可以处理,但我打算根据需要运行此消费者,并带有开始和结束偏移量。虽然可以在消费者中读取命令行参数,但我担心的是
回调变量是静态的(我不可能创建 ReplayJobKafkaConsumer 的实例
它是一个普通变量而不是 ThreadLocal
虽然这个容器的生命周期只是从头到尾,我想知道这个设置是否有缺陷,需要一些确认这个实现是好的。
解决方案
你似乎对正在发生的事情有一些基本的误解。
之所以
ThreadLocal
需要,是因为 Kafka 消费者对象不是线程安全的。如果将回调存储在 ThreadLocal 中,则可以在运行时执行任意查找操作 - 可以从方法中执行,也可以在没有消息时onMessage
侦听。ListenerContainerIdleEvent
您不能
ReplayJobKafkaConsumer.getAnotherSeekCallback().seek("top", 0, 23);
从另一个线程执行任意搜索。在分配分区之前,您不能执行任意搜索。
因此,正如我在其他答案/评论中告诉您的那样,您必须在分配分区时进行搜索。
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// Do the seeks here using the `consumerSeekCallback` parameter.
}
ConsumerSeekAware
使用现代版本的 spring-kafka,除非您想在运行时执行任意搜索(在初始搜索之后),否则您不需要使用。您可以使用 aConsumerAwareRebalanceListener
代替。
推荐阅读
- java - Jersey 客户端 POST 请求正文大小限制
- pandas - 在 Spark 中使用 Pandas udf 与 Facebook 先知进行预测
- javascript - 将 JSON 分配给输入中的值
- apache-kafka - Mirror Maker 2 生产者主题镜像场景
- python - 由于此错误,无法在 Pycharm 中导入 kivy:ModuleNotFoundError: No module named 'kivy'
- javascript - Meteor.js 中超出标准输出 maxBuffer 长度
- python - 如何在 MySQL 数据库中存储 PDF 而无需在 Python 中生成 PDF 文件
- c# - 不使用 C# 将数据插入 SQL Server 数据库
- php - 如何在类内的多维数组中插入变量
- c++ - 在 C++ 中的继承类中声明构造函数时出错