spring-integration - SI 订阅多个 mqtt 主题
问题描述
我正在尝试学习如何在Spring-Integration中处理MQTT 消息。已创建一个转换器,它为每个 MQTT 主题订阅一个 MqttPahoMessageDrivenChannelAdapter 以使用和转换消息。
问题是我们的数据提供者正计划“加速”发布消息。因此,不是有几个(<=10)主题,每个主题都有大约 150 个字段的消息,而是计划将这些字段中的每一个发布到单独的 MQTT 主题。
这意味着我的转换器将不得不消耗 ca。1000个mqtt主题,但不知道是否:
- spring-integration 是否仍然是一个不错的选择。引起afaik。提到的适配器使用 PAHO MqttClient,它将在一个线程中使用它订阅的所有主题的消息,并且创建这些适配器的 1000 个实例是一种过度杀伤力。
- 如果我们进一步坚持使用 spring-integration 并使用提供的组件,那么为所有字段创建一个单一的入站适配器是否是一个好主意,这些字段以前在一个主题的消息中,但将转换从适配器 bean 转移到一个单独的 bean(进行转换)通过执行器通道连接到适配器,从而在某个线程池上并行执行这些字段的转换。
提前感谢您的回答!
解决方案
我觉得你的想法很有道理。
为此,您需要实现直通 MqttMessageConverter
并提供MqttMessage
as apayload
和topic
标头:
public class PassThroughMqttMessageConverter implements MqttMessageConverter {
@Override
public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
return MessageBuilder.withPayload(mqttMessage)
.setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
.build();
}
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return null;
}
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
return null;
}
}
ExecutorChannel
因此,在 custom中提到之后,您真的可以在下游执行目标转换transformer
。
您也可以考虑实现一个自定义MqttPahoClientFactory
(可能也可以使用的扩展DefaultMqttPahoClientFactory
)并提供一个自定义ScheduledExecutorService
注入到MqttClient
您要在getClientInstance()
.
推荐阅读
- prolog - prolog - findall 排序并返回第一个
- java - 向用户显示他点击了哪个按钮
- php - 如果打开 php 文件,指示 Web 浏览器重定向到 localhost
- javascript - Array.includes() 返回未定义
- c++ - 如何检查两个矩阵是否相同?
- javascript - 模板文字中的数组映射在项目之间呈现额外的逗号
- node.js - firebase 统计在线用户并在全球计数器中更新
- selenium - 在使用 java 的 Selenium Webdriver 中,热点击“获取 adobe Flash Player”图标?
- wordpress - VPS 上 WordPress 的权限
- c - 在C编程中实现堆栈的push方法