apache-kafka - 如何设置 Kafka 消费者配置以从现在开始消费消息?
问题描述
我是 Kakfa 的新手,正在学习如何在 Kafka 主题中生成和使用消息。
我正在使用 @EnableKafka 使用 Kafka 配置
@EnableKafka
@Configuration
public class ConsumerConfig implements ApplicationContextAware {
@Value("${kafka.servers}")
private String kafkaServerAddress;
@Value("${kafka.ca.groupid}")
private String groupId;
private ApplicationContext context;
public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> binlogListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
DefaultKafkaConsumerFactory<String, Object> defaultFactory = consumerFactory();
defaultFactory.setKeyDeserializer(new StringDeserializer());
defaultFactory.setValueDeserializer(new JsonDeserializer(BinlogMessage.class));
factory.setConsumerFactory(defaultFactory);
return factory;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
}
解决方案
得到答案,可以通过将属性 AUTO_OFFSET_RESET_CONFIG 设置为 latest 来完成,如下所示:
public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}
推荐阅读
- r - 为什么训练数据上的线性回归拟合值和预测值存在差异?
- firebase-authentication - 托管公司托管的域上的 Firebase Web 登录?
- vue.js - 如何对 vue 中的变量进行单元测试并用 jest 进行 vuetify?
- matlab - matlab在线脚本中如何取消注释代码
- mysql - 如何在 MySQL Workbench 8.0 中加载本地数据?
- php - 如何在路由laravel中传递两个不同的ID?
- reactjs - 我的反应应用程序未按预期呈现。为什么会这样?
- z3py - How to get result from z3py calculation?
- python - 在 Pycharm 上安装 xgboost 时出现问题
- java - 如何重新排序双向链表以匹配具有相同元素的数组的顺序?