java - 收到消息后如何在spring mqtt集成中停止重复订阅保留消息
问题描述
订阅保留主题时获得重复的保留消息。
我在我的物联网项目中使用了 spring mqtt 集成。在这里,一旦收到保留的消息,它就会继续订阅,直到我将空白消息发布到同一主题,并将保留标志设置为 true。我注意到,当我在终端中使用 mqtt 命令执行相同的过程时,订阅保留的主题时,它只订阅一次,不会发生重复订阅。
我使用以下代码通过使用 # 订阅所有主题
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public DefaultMqttPahoClientFactory clientfactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("username");
options.setPassword("password".toCharArray());
options.setCleanSession(false);
//options.setCleanSession(true);
//options.setServerURIs(new String[] { "tcp://localhost" });
options.setServerURIs(new String[] { "url" });
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("admin",
clientfactory(), "#");
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
/*adapter.setc*/
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
public void handleMessage(Message<?> message) throws MessagingException {
mqttSubscriptionProcessor.processSubscription(message);
}
};
}
我使用此命令发布了保留的消息
mosquitto_pub -u admin -P pwd -t hello/topic -m "test msg" -r -d
eclipse控制台中的结果是
{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg
{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg
{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg
{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg
这里我只需要订阅一次保留的主题,必须更改 spring 集成代码中的任何更改。
解决方案
这就是保留消息的工作方式,当客户端订阅匹配主题时,将始终将保留位设置为主题的最后一条发布的消息在任何新消息之前首先传递给客户端。
如果您不希望保留消息(因此始终传递),则不要在发布时设置保留位。
否则,正如您所发现的,您可以通过发布带有空负载并将保留位设置为同一主题的消息来清除主题的保留消息。
或者您可以过滤客户端中的消息,因为您始终可以在消息传递时检查是否在消息上设置了保留标志。
至于弹簧方面,您似乎正在创建 4 个客户端,因此每个客户端都在订阅时接收消息。您可以通过查看代理日志来证明这一点,如果您以详细模式运行 mosquitto,它将显示它传递的每条消息。
推荐阅读
- flutter - Flutter 是否可以在没有 Google Play 服务的情况下工作?
- mysql - 如何删除 MySQL Query 中的重复行
- python - 在一个查询中创建模型的多个实例
- android - 分离时如何在片段中保存webview的状态?
- angular - 在进行 http 请求时使用 Observables 和 Subjects
- android - xml上图片位置的问题
- azure - AADSTS7000222:提供的客户端密钥已过期
- c# - EF 核心 3.1:使用即时加载加载相关实体时是否应该初始化列表导航属性?
- javascript - 将 NodeJS 版本从 Node10 LTS 升级到 Node 14
- jenkins - 如何自动删除 Jenkins Job?