spring-boot - 在 Spring 中消费 Kafka 消息
问题描述
通过遵循本教程,我能够创建一个简单的生产者-消费者示例。在我的示例中,只有 1 个主题,我正在听那个主题。因此,其中的代码ReceiverConfig
是有意义的。特别是围绕GROUP_ID_CONFIG
ie,我创建主题topic_name
,然后在此配置中配置它。现在我的问题是,如果我有超过 1 个主题怎么办。假设我有topic_1
,topic_2
等等?我应该ReceiverConfig
为每个单独的主题创建吗?
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(GROUP_ID_CONFIG, "topic_name");
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
解决方案
简短的回答是否定的,您不需要为每个主题创建多个配置。
在继续之前,我认为最好指定groupId
消费者进程所属的组和消费者进程所消费的组topic
是两个不同的东西。
通过下面的句子,您将告诉消费者它属于topic_name组,仅此而已。
props.put(GROUP_ID_CONFIG, "topic_name");
如果您希望消费者从多个主题中读取数据,则有一个订阅方法,它接收一个集合作为参数,这样您就可以指定所有主题来读取数据,而无需为每个主题创建新配置。
请检查这个例子,你会看到我提到的方法
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
推荐阅读
- c# - Combine 2 integers and store it in a ulong
- r - 将具有共同值的行放入列表中
- docker - Is there any difference between CMD ["./start.sh"] and CMD ./start.sh in docker file?
- mysql - MySQL Round() 函数用于存储在字符串中的十进制
- jpa - JPQL 聚合函数与规范
- javascript - 无法在 react-native jest 中记录酶浅包装器的值
- sql - 从 1 开始计数
- neural-network - Keras 中 add_loss 函数的作用是什么?
- java - 在非 Spring 项目中运行 Spring Cloud Contract 测试
- reactjs - 反应路由器 | 私有路由器上的呼叫功能