首页 > 解决方案 > Spring Mqtt 从上次读取位置订阅

问题描述

我正在使用弹簧集成 Mqtt。每当我重新启动应用程序时,它都会从第一个位置开始使用消息。是否有任何属性可以使应用程序从上次读取或当前位置读取。

我找不到它。

如果没有这样的选项,那么如果下线部署了新版本并且应用程序从第一个位置开始使用,就会出现问题

下面是我用来测试的代码

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),
                        batteryLevel, locationChange);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        adapter.setManualAcks(false);
        return adapter;
    }
    
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { serverAddress });
        options.setCleanSession(false);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
                LOGGER.info("topic : " +topic);
                LOGGER.info(message.getPayload().toString())
               }
          }

标签: mqttspring-integration-mqtt

解决方案


推荐阅读