spring-kafka - 记录过滤策略问题(春季启动:2.3.8)。过滤后的消息一次又一次地进入过滤器
问题描述
我正在研究 spring kafka 批处理侦听器过滤策略。我面临一个问题,过滤的事件一次又一次地出现。有人可以帮我解决这个问题吗?带有kafka版本的弹簧靴(2.3.8)
这是我的配置:
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new
ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.getContainerProperties().setIdleBetweenPolls(30000);
factory.setRecordFilterStrategy(
(consumerRecord) -> {
MyObject myObject = new ObjectMapper().readValue(consumerRecord.value(), MyObj.class);
if (myObject.frequency > 10) {
return false;
} else {
return true;
}});
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
解决方案
当使用带有手动确认的批处理模式时,如果您过滤所有记录(全部丢弃),侦听器将获得一个空列表,因此您仍然可以确认批处理以提交偏移量。
我刚刚对其进行了测试,它按预期工作。
@SpringBootApplication
public class So67259790Application {
public static void main(String[] args) {
SpringApplication.run(So67259790Application.class, args);
}
@KafkaListener(id = "so67259790", topics = "so67259790")
public void listen(List<String> in, Acknowledgment ack) {
System.out.println(in);
ack.acknowledge();
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67259790").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so67259790", "foo");
template.send("so67259790", "bar");
};
}
@Bean
public RecordFilterStrategy<Object, Object> rfs() {
return rec -> true;
}
}
推荐阅读
- json - Json.Net JsonConstructor 属性替代 System.Text.Json
- apache-spark - Sparkly_Apply 与 Coxph 函数
- mongodb - 确保遵守 Kafka、Springboot 和 MongoDB 的一致性
- python-3.x - 如何绘制方程
- coq - 如何增加列表中的标题
- pytest - Proboscis 可以与 Pytest 一起使用吗?
- c# - 验证 2 个组件具有相同的值
- reactjs - 如果找不到模块,如何捕获错误?
- c++ - 使用 reinterpret_cast c++ 后的意外行为
- webpack - 使用 SSR 与 IE11 反应 Redux - 客户端应用程序无法启动