首页 > 解决方案 > 为主题动态创建消息监听器

问题描述

我正在分析一个关于创建可以部署在多个微服务中的通用消费者库的问题(它们都是基于弹簧的)。要求有大约 15-20 个主题来监听。如果我们使用基于注释的 kafka 监听器,我们需要为每个微服务添加更多代码。有什么方法可以让我们根据某个xml文件动态创建消费者,每个消费者都可以注入这些数据

  1. 话题
  2. 群号
  3. 分割
  4. 过滤器(如果有)

加上注解,设计很死板。我唯一能想到的办法就是,我们可以在解析xml config后创建messagelisteners,每个topic会有自己的concurrentmessagelistenercontainer。

有没有其他更好的方法可以使用 spring ?

PS:我对 spring 和 kafka 有点陌生。如果在解释要求时有混淆,请告诉我

谢谢, 拉贾塞卡

标签: apache-kafkaspring-kafka

解决方案


也许您可以使用主题模式。看看消费者属性。例如听者

@KafkaListener(topicPattern = "topic1|topic2")

会听topic1topic2

如果您需要动态创建侦听器,则必须格外小心,因为您必须关闭它。

我会使用与 spring 类似的方法KafkaListenerAnnotationBeanPostProcessor。这个后处理器负责处理@KafkaListeners。

以下是它如何工作的建议:

public class DynamicEndpointRegistrar {

    private BeanFactory beanFactory;
    private KafkaListenerContainerFactory<?> containerFactory;
    private KafkaListenerEndpointRegistry endpointRegistry;
    private MessageHandlerMethodFactory messageHandlerMethodFactory;

    public DynamicEndpointRegistrar(BeanFactory beanFactory,
            KafkaListenerContainerFactory<?> containerFactory,
            KafkaListenerEndpointRegistry endpointRegistry, MessageHandlerMethodFactory messageHandlerMethodFactory) {
        this.beanFactory = beanFactory;
        this.containerFactory = containerFactory;
        this.endpointRegistry = endpointRegistry;
        this.messageHandlerMethodFactory = messageHandlerMethodFactory;
    }

    public void registerMethodEndpoint(String endpointId, Object bean, Method method, Properties consumerProperties,
            String... topics) throws Exception {
        KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
        registrar.setBeanFactory(beanFactory);
        registrar.setContainerFactory(containerFactory);
        registrar.setEndpointRegistry(endpointRegistry);
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory);

        MethodKafkaListenerEndpoint<Integer, String> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setBeanFactory(beanFactory);
        endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);

        endpoint.setId(endpointId);
        endpoint.setGroupId(consumerProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
        endpoint.setBean(bean);
        endpoint.setMethod(method);

        endpoint.setConsumerProperties(consumerProperties);
        endpoint.setTopics(topics);

        registrar.registerEndpoint(endpoint);
        registrar.afterPropertiesSet();
    }

}

然后,您应该能够动态注册侦听器。例如

DynamicEndpointRegistrar dynamicEndpointRegistrar = ...;
MyConsumer myConsumer = ...; // create an instance of your consumer
Properties properties = ...; // consumer properties

// the method that should be invoked
// (the method that's normally annotated with KafkaListener)
Method method = MyConsumer.class.getDeclaredMethod("consume", String.class);

dynamicEndpointRegistrar.registerMethodEndpoint("endpointId", myConsumer, method, properties, "topic");

推荐阅读