mqtt - Spring Mqtt 从上次读取位置订阅
问题描述
我正在使用弹簧集成 Mqtt。每当我重新启动应用程序时,它都会从第一个位置开始使用消息。是否有任何属性可以使应用程序从上次读取或当前位置读取。
我找不到它。
如果没有这样的选项,那么如果下线部署了新版本并且应用程序从第一个位置开始使用,就会出现问题
下面是我用来测试的代码
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),
batteryLevel, locationChange);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
adapter.setManualAcks(false);
return adapter;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { serverAddress });
options.setCleanSession(false);
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
LOGGER.info("topic : " +topic);
LOGGER.info(message.getPayload().toString())
}
}
解决方案
推荐阅读
- android - 从模板生成新对象时如何将 template.xml 参数值转换为小写?
- vue.js - 如何在 vuex 和 quasar 应用程序中使用 createPersistedState?
- python - TensorFlow: AttributeError when calling TensorFlow's IO functions; why are they missing?
- c# - 编写DAO类来填充一对多关系
- node.js - 我有一个带有链式承诺的函数,之后我想返回一个整数值,但在函数解析之前我得到了 undefined
- oracle-data-integrator - 多个用户在同一张桌子上发出问题
- angularjs - 如何为 anguluarjs 应用程序定义 robots.txt
- symfony - 如何修复 Voter 必须实现 VoterInterface 错误
- python - 如何在 python 3 中编辑 xml 配置文件?
- angular - http-interceptor 等待订阅完成获取数据