java - 如何在spring-kafka中配置kafka消费者轮询的频率
问题描述
我正在尝试在我的 Spring Boot 项目中使用 spring-kafka 从我的 kafka 中读取消息。我正在使用@KafkaListener,但问题是我的消费者一直在运行。一旦我从控制台生成一条消息,它就会在我的应用程序中弹出。我想定期轮询。我怎样才能做到这一点?
@Service
public class KafkaReciever {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaReciever.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "test")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
latch.countDown();
}
}
这是我的消费者配置:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo1");
// automatically reset the offset to the earliest offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
解决方案
这就是它的设计方式;它是一个消息驱动的容器(并且与其他 Spring 消息传递技术抽象一致 - RabbitMQ、JMS 等)。
要仅按需获取消息,您有 2 个选择:
- 使用消费者工厂创建消费者,订阅(或分配)主题/分区并调用
poll()
- 使用
spring-integration-kafka
'sKafkaMessageSource
并调用receive()
在这两种情况下,如果您使用的是 kafka 组管理,则需要注意max.poll.interval.ms
避免重新平衡。
您可以使用 spring 集成入站通道适配器定期轮询消息源。
推荐阅读
- google-sheets - 我如何与列进行比较并自动返回一个数字:示例:=IFS(AND(B2="Groups",N2="Blood Donation"),4)
- xml - 第 x 行的 xml 解析错误。文档末尾的额外内容
- c - 找不到 gtkbuilder.c 的日志域名
- python - 返回所有单个字母的集合,而不是每个单词的集合
- javascript - 如何在iOS中停止双击缩放整个页面
- javascript - 在另一个函数中使用一个函数 - javascript
- powershell - 在 PowerShell 中使用 Test-Connection 时如何返回布尔值和 TestConnectionCommand+PingStatus?
- laravel - 使用 if/else 运算符的 Laravel 查询
- java - 将 2 个不同大小的二维数组加在一起
- python - 如何搜索包含不同值的多行?