spring - Spring-Kafka Concurrency Property
问题描述
I am progressing on writing my first Kafka Consumer by using Spring-Kafka. Had a look at the different options provided by framework, and have few doubts on the same. Can someone please clarify below if you have already worked on it.
Question - 1 : As per Spring-Kafka documentation, there are 2 ways to implement Kafka-Consumer; "You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation". Can someone tell when should I choose one option over another ?
Question - 2 : I have chosen KafkaListener approach for writing my application. For this I need to initialize a container factory instance and inside container factory there is option to control concurrency. Just want to double check if my understanding about concurrency is correct or not.
Suppose, I have a topic name MyTopic which has 4 partitions in it. And to consume messages from MyTopic, I've started 2 instances of my application and these instances are started by setting concurrency as 2. So, Ideally as per kafka assignment strategy, 2 partitions should go to consumer1 and 2 other partitions should go to consumer2. Since the concurrency is set as 2, does each of the consumer will start 2 threads, and will consume data from the topics in parallel ? Also should we consider anything if we are consuming in parallel.
Question 3 - I have chosen manual ack mode, and not managing the offsets externally (not persisting it to any database/filesystem). So should I need to write custom code to handle rebalance, or framework will manage it automatically ? I think no as I am acknowledging only after processing all the records.
Question - 4 : Also, with Manual ACK mode, which Listener will give more performance? BATCH Message Listener or normal Message Listener. I guess if I use Normal Message listener, the offsets will be committed after processing each of the messages.
Pasted the code below for your reference.
Batch Acknowledgement Consumer:
public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record : " + record.value());
// Process the message here..
listener.addOffset(record.topic(), record.partition(), record.offset());
}
acknowledgment.acknowledge();
}
Initialising container factory:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enablAutoCommit);
configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPolInterval);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configs;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
// Not sure about the impact of this property, so going with 1
factory.setConcurrency(2);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.getContainerProperties().setConsumerRebalanceListener(RebalanceListener.getInstance());
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setMessageListener(new BatchAckConsumer());
return factory;
}
解决方案
@KafkaListener
is a message-driven "POJO" it adds stuff like payload conversion, argument matching, etc. If you implementMessageListener
you can only get the rawConsumerRecord
from Kafka. See @KafkaListener Annotation.Yes, the concurrency represents the number of threads; each thread creates a
Consumer
; they run in parallel; in your example, each would get 2 partitions.
Also should we consider anything if we are consuming in parallel.
Your listener must be thread-safe (no shared state or any such state needs to be protected by locks.
It's not clear what you mean by "handle rebalance events". When a rebalance occurs, the framework will commit any pending offsets.
It doesn't make a difference; message listener Vs. batch listener is just a preference. Even with a message listener, with MANUAL ackmode, the offsets are committed when all the results from the poll have been processed. With MANUAL_IMMEDIATE mode, the offsets are committed one-by-one.
推荐阅读
- powerbi - Power BI - 使用特定 DAX 公式时,筛选不适用于子列
- javascript - 盖茨比类别 slugs 在动态类别页面中不起作用
- flutter - Flutter 何时使用 SF Pro Display 或 SF Pro Text 以及如何将它们分配给 Text 小部件?
- image-processing - 如何从知道现实世界中质心位置的图像和图像投影中获得现实世界中正方形的方位
- multi-tenant - 尝试在数据库迁移时使用 ABP 框架创建数据库
- imagemagick - 使用蒙版制作透明区域
- encryption - OpenPGP加密中的私钥存储问题
- html - Excel 函数可以识别粗体文本吗?
- azure - 我应该如何在 Azure 中为我的应用程序实施健康检查?
- python - matplotlib:为所有图形设置单个标题(不是子图)