apache-kafka - 为主题动态创建消息监听器
问题描述
我正在分析一个关于创建可以部署在多个微服务中的通用消费者库的问题(它们都是基于弹簧的)。要求有大约 15-20 个主题来监听。如果我们使用基于注释的 kafka 监听器,我们需要为每个微服务添加更多代码。有什么方法可以让我们根据某个xml文件动态创建消费者,每个消费者都可以注入这些数据
- 话题
- 群号
- 分割
- 过滤器(如果有)
加上注解,设计很死板。我唯一能想到的办法就是,我们可以在解析xml config后创建messagelisteners,每个topic会有自己的concurrentmessagelistenercontainer。
有没有其他更好的方法可以使用 spring ?
PS:我对 spring 和 kafka 有点陌生。如果在解释要求时有混淆,请告诉我
谢谢, 拉贾塞卡
解决方案
也许您可以使用主题模式。看看消费者属性。例如听者
@KafkaListener(topicPattern = "topic1|topic2")
会听topic1
和topic2
。
如果您需要动态创建侦听器,则必须格外小心,因为您必须关闭它。
我会使用与 spring 类似的方法KafkaListenerAnnotationBeanPostProcessor
。这个后处理器负责处理@KafkaListener
s。
以下是它如何工作的建议:
public class DynamicEndpointRegistrar {
private BeanFactory beanFactory;
private KafkaListenerContainerFactory<?> containerFactory;
private KafkaListenerEndpointRegistry endpointRegistry;
private MessageHandlerMethodFactory messageHandlerMethodFactory;
public DynamicEndpointRegistrar(BeanFactory beanFactory,
KafkaListenerContainerFactory<?> containerFactory,
KafkaListenerEndpointRegistry endpointRegistry, MessageHandlerMethodFactory messageHandlerMethodFactory) {
this.beanFactory = beanFactory;
this.containerFactory = containerFactory;
this.endpointRegistry = endpointRegistry;
this.messageHandlerMethodFactory = messageHandlerMethodFactory;
}
public void registerMethodEndpoint(String endpointId, Object bean, Method method, Properties consumerProperties,
String... topics) throws Exception {
KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
registrar.setBeanFactory(beanFactory);
registrar.setContainerFactory(containerFactory);
registrar.setEndpointRegistry(endpointRegistry);
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
MethodKafkaListenerEndpoint<Integer, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setBeanFactory(beanFactory);
endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
endpoint.setId(endpointId);
endpoint.setGroupId(consumerProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
endpoint.setBean(bean);
endpoint.setMethod(method);
endpoint.setConsumerProperties(consumerProperties);
endpoint.setTopics(topics);
registrar.registerEndpoint(endpoint);
registrar.afterPropertiesSet();
}
}
然后,您应该能够动态注册侦听器。例如
DynamicEndpointRegistrar dynamicEndpointRegistrar = ...;
MyConsumer myConsumer = ...; // create an instance of your consumer
Properties properties = ...; // consumer properties
// the method that should be invoked
// (the method that's normally annotated with KafkaListener)
Method method = MyConsumer.class.getDeclaredMethod("consume", String.class);
dynamicEndpointRegistrar.registerMethodEndpoint("endpointId", myConsumer, method, properties, "topic");
推荐阅读
- jsf - 在对话框中单击“是”后不填充对象
- google-cloud-platform - 使用 google cloud build 成功部署构建后向用户发送电子邮件通知
- python - 从 pandas df 中的 URL 生成的词云 - 220 篇文章一个词云,而不是每篇文章一个词云
- algorithm - 通过比较算法选出的 n 个最佳项目
- codeigniter - 文件不会在 CodeIgniter 中上传
- karate - 空手道 UI:使用 driver.quit() 时数据驱动测试无法正常工作
- python - Reverse one-hot encoded labels in XGBoost?
- java - 通过 JNI 将 std::byte* 传递给 java
- if-statement - 将 if 条件与鱼壳中的 contains 函数相结合
- c# - C#如何用委托替换事件?