首页 > 解决方案 > 不同的消费者有各自的过滤策略

问题描述

在 Spring Boot 应用程序中,我们可以有 2 个具有各自过滤策略的不同消费者吗?如果是这样,我们如何将过滤策略挂钩到相应的消费者?(boot-2.3.8) 注意:这两个属性是不同的。

@KafkaListener(topics = "topic1", groupId = "group-id1",
          properties = {
                  "max.poll.interval.ms: ${max.poll.interval.ms}",
                  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
                  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
                  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
                  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
                  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
          }, autoStartup = "true")
  public void onMessage(
          @Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
       processMessageAndAcknowledge(consumerRecord, acknowledgment);
  }
  
  
   @Bean
    public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic1() {
        return rec -> true;
    }
    
    @KafkaListener(topics = "topic2", groupId = "group-id2",
          properties = {
                  "max.poll.interval.ms: ${max.poll.interval.ms}",
                  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
                  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
                  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
                  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
                  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
          }, autoStartup = "true")
  public void onMessage(
          @Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
       processMessageAndAcknowledge(consumerRecord, acknowledgment);
  }
  
  @Bean
    public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic2() {
        return rec -> true;
    }

标签: spring-kafka

解决方案


目前无法在侦听器级别覆盖过滤策略;您将不得不创建两个容器工厂。随意在 GitHub 问题中打开新功能请求。https://github.com/spring-projects/spring-kafka/issues

一种快速的解决方案是编写一个包装器策略,根据ConsumerRecord.

return rec -> rec.topic().equals("topic1") 
    ? recordFilterStrategyForTopic1.filter(rec)
    : recordFilterStrategyForTopic2.filter(rec);

编辑

这是一个完整的例子:

@SpringBootApplication
public class So67391819Application {

    public static void main(String[] args) {
        SpringApplication.run(So67391819Application.class, args);
    }

    @KafkaListener(id = "so67391819-1", topics = "so67391819-1")
    void listen1(String in, @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println("listener1 " + in + "@" + offset);
    }

    @KafkaListener(id = "so67391819-2", topics = "so67391819-2")
    void listen2(String in, @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println("listener2 " + in + "@" + offset);
    }

    @Bean
    NewTopic topic1() {
        return TopicBuilder.name("so67391819-1").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic topic2() {
        return TopicBuilder.name("so67391819-2").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> f1() {
        return rec -> rec.offset() % 2 == 0;
    }

    @Bean
    RecordFilterStrategy<String, String> f2() {
        return rec -> rec.offset() % 2 == 1;
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> {
                template.send("so67391819-1", "foo" + i);
                template.send("so67391819-2", "bar" + i);
            });
        };
    }

}

@Component
class FactoryCustomizer {

    FactoryCustomizer(ConcurrentKafkaListenerContainerFactory<String, String> factory,
            RecordFilterStrategy<String, String> f1,
            RecordFilterStrategy<String, String> f2) {

        factory.setRecordFilterStrategy(rec -> rec.topic().endsWith("-1") ? f1.filter(rec) : f2.filter(rec));
    }

}
listener1 foo1@1
listener2 bar0@0
listener1 foo3@3
listener1 foo5@5
listener2 bar2@2
listener1 foo7@7
listener2 bar4@4
listener1 foo9@9
listener2 bar6@6
listener2 bar8@8

推荐阅读