首页 > 解决方案 > 记录过滤策略问题(春季启动:2.3.8)。过滤后的消息一次又一次地进入过滤器

问题描述

我正在研究 spring kafka 批处理侦听器过滤策略。我面临一个问题,过滤的事件一次又一次地出现。有人可以帮我解决这个问题吗?带有kafka版本的弹簧靴(2.3.8)

这是我的配置:

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new 
                  ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);

    factory.setBatchListener(true);
    factory.setAckDiscarded(true);
    factory.getContainerProperties().setIdleBetweenPolls(30000);
    factory.setRecordFilterStrategy(
            (consumerRecord) -> { 
                MyObject myObject = new ObjectMapper().readValue(consumerRecord.value(), MyObj.class);
                if (myObject.frequency > 10) {
                        return false;
                } else {  
                  return true;
                }});
   factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
   

标签: spring-kafka

解决方案


当使用带有手动确认的批处理模式时,如果您过滤所有记录(全部丢弃),侦听器将获得一个空列表,因此您仍然可以确认批处理以提交偏移量。

我刚刚对其进行了测试,它按预期工作。

@SpringBootApplication
public class So67259790Application {

    public static void main(String[] args) {
        SpringApplication.run(So67259790Application.class, args);
    }

    @KafkaListener(id = "so67259790", topics = "so67259790")
    public void listen(List<String> in, Acknowledgment ack) {
        System.out.println(in);
        ack.acknowledge();
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67259790").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so67259790", "foo");
            template.send("so67259790", "bar");
        };
    }

    @Bean
    public RecordFilterStrategy<Object, Object> rfs() {
        return rec -> true;
    }

}

推荐阅读