首页 > 解决方案 > 在 Spring Kafka 中使用另一个消费者的值触发一个 Kafka 消费者

问题描述

我有一个调度程序,它产生一个事件。我的消费者消费了这个事件。此事件的有效负载是具有以下字段的 json:

private String topic;
private String partition;
private String filterKey;
private long CustId;  

现在我需要再触发一个消费者,它将获取我从第一个消费者那里得到响应的所有这些信息。

@KafkaListener(topics = "<**topic-name-from-first-consumer-response**>", groupId = "group" containerFactory = "kafkaListenerFactory")
    public void consumeJson(List<User> data, Acknowledgment acknowledgment,
                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                            @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
                            // consumer code goes here...}

我需要创建一些可以代替主题名称传递的动态变量。

同样,我在配置文件中使用过滤,我需要在配置中动态传递密钥。

factory.setRecordFilterStrategy(new RecordFilterStrategy<String, Object>() {
            @Override
            public boolean filter(ConsumerRecord<String, Object> consumerRecord) {
                if(consumerRecord.key().equals("**Key will go here**")) {
                    return false;
                }
                else {
                    return true;
                }
            }

        });

我们如何从第一个消费者的响应中动态注入这些值并触发第二个消费者。两个消费者都在同一个应用程序中

标签: javaspring-bootapache-kafkaspring-kafka

解决方案


您不能使用带注释的侦听器来做到这一点,该配置仅在初始化期间使用;您需要自己创建一个侦听器容器(使用ConcurrentKafkaListenerContainerFactory)来动态创建一个侦听器。

编辑

这是一个例子。

@SpringBootApplication
public class So69134055Application {

    public static void main(String[] args) {
        SpringApplication.run(So69134055Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so69134055").partitions(1).replicas(1).build();
    }

}

@Component
class Listener {

    private static final Logger log = LoggerFactory.getLogger(Listener.class);

    private static final Method otherListen;

    static {
        try {
            otherListen = Listener.class.getDeclaredMethod("otherListen", List.class);
        }
        catch (NoSuchMethodException | SecurityException ex) {
            throw new IllegalStateException(ex);
        }
    }

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final MessageHandlerMethodFactory methodFactory;

    private final KafkaAdmin admin;

    private final KafkaTemplate<String, String> template;

    public Listener(ConcurrentKafkaListenerContainerFactory<String, String> factory, KafkaAdmin admin,
            KafkaTemplate<String, String> template, KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp) {

        this.factory = factory;
        this.admin = admin;
        this.template = template;
        this.methodFactory = bpp.getMessageHandlerMethodFactory();
    }

    @KafkaListener(id = "so69134055", topics = "so69134055")
    public void listen(String topicName) {
        try (AdminClient client = AdminClient.create(this.admin.getConfigurationProperties())) {
            NewTopic topic = TopicBuilder.name(topicName).build();
            client.createTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error("Failed to create topic", e);
        }
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topicName, 0));
        BatchMessagingMessageListenerAdapter<String, String> adapter =
                new BatchMessagingMessageListenerAdapter<>(this, otherListen);
        adapter.setHandlerMethod(new HandlerAdapter(
                this.methodFactory.createInvocableHandlerMethod(this, otherListen)));
        FilteringBatchMessageListenerAdapter<String, String> filtered =
                new FilteringBatchMessageListenerAdapter<>(adapter, record -> !record.key().equals("foo"));
        container.getContainerProperties().setMessageListener(filtered);
        container.getContainerProperties().setGroupId("group.for." + topicName);
        container.setBeanName(topicName + ".container");
        container.start();
        IntStream.range(0, 10).forEach(i -> this.template.send(topicName, 0, i % 2 == 0 ? "foo" : "bar", "test" + i));
    }

    void otherListen(List<String> others) {
        log.info("Others: {}", others);
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

输出 - 显示过滤器已应用于键中的记录bar

Others: [test0, test2, test4, test6, test8]

推荐阅读