spring-kafka - 使用过滤策略和手动提交的spring kafka批处理示例
问题描述
我打算使用 spring kafka 批处理侦听器进行批处理。我正在为这两种情况寻找一些样本。
- 我们如何通过批处理实现过滤记录策略?更新:来自文档-“此外,还提供了一个 FilteringBatchMessageListenerAdapter,供您使用批处理消息侦听器时使用。” 不清楚。我没有看到任何容器工厂方法来设置此 filterbatchmessagelisteneradapter 对象或过滤器实现。
这是我的批处理侦听器过滤策略代码:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecords) {
//log.info("Retrieved the record {} from the partition {} with offset {}", consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset());
return true;
}
});
return factory;
}
- 一旦我们在消费者中检索到这批消息并全部处理完毕,我们如何进行手动偏移提交。在批处理过程中,如果出现任何故障,只想将该消息推送到错误主题。但最后我想一次提交整个批处理。
现在我想到的另一个问题是上述场景如何与单个消费者和多个消费者一起工作。
假设案例 1:单一消费者
假设我们有一个包含 5 个分区的主题。当我们订阅该主题时,我们假设我们从该主题中获得了 100 条消息,其中每个分区有 20 条消息。如果我们要提交这些消息偏移量,确认对象是否保存最后一条消息的每个分区和最后一个偏移量?
案例2:多个消费者
使用与 case1 中提到的相同输入,如果我们启用具有分区计数的相等消费者数量,则 ack 对象是否保存分区和最后一条消息偏移量?
你能帮我解决这个问题吗?
解决方案
见
FilteringBatchMessageListenerAdapter
https://docs.spring.io/spring-kafka/docs/current/reference/html/#filtering-messages使用批处理处理异常的最简单方法是使用 a
RecoveringBatchErrorHandler
和 aDeadLetterPublishingRecoverer
。抛出 aBatchListenerFailedException
表示批次中哪条记录失败;成功记录的偏移量被提交,剩余的记录(包括失败的记录)将被重新传递,直到重试(如果配置)用尽,失败的记录将进入死信主题,其余的将被重新传递。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh
是的,当批次被确认时,批次中每个分区的最新偏移量 (+1) 将被提交。
如果您有多个消费者,则分区将分布在这些消费者之间。
推荐阅读
- react-native - 从 yield put 设置 redux 状态...需要很多时间
- java - 从 Dto 转换为实体后 getId 上的 NPE(mapstruct 和 modelmapper)
- javascript - 如何让我的 JavaScript 保释我访问未定义的属性?
- c++ - 分配数组时出现异常,将其分配为二维数组时也不例外
- java - 如何将对象添加到对象数组列表中的数组列表?
- javascript - MongoDB .remove() 匹配多个条件
- java - Java:如何在具有多个类的同时比较数组中的对象
- flutter - 按钮未显示,出现 120 溢出错误
- neo4j - 在 Neo4J 中创建具有相同属性的节点之间的关系
- amazon-ec2 - 如何在两个 EC2 实例之间设置 AWS 负载平衡