首页 > 解决方案 > 客户端断开连接时的 Paho MQTT 消息可靠性

问题描述

我正在开发一个 MQTT kafka 源连接器。消息来自 MQTT Publisher,然后连接器将订阅 MQTT 主题(/iot/sensor/#),这些消息将发布到 Kafka 代理。但与此同时,如果我停止连接器并再次重新启动连接器,则连接器正在消耗来自 mqtt 代理的消息,但缺少一些 mqtt 消息(如果我从 MQTT 发布者发送 1000 条消息,但在 kafka 中没有收到 1000 条消息) 我给出mqtt ClientID 是唯一名称。我使用 setCleanSession=false,QOS=1 设置 mqtt paho 客户端属性并使用 MqttDefaultFilePersistence(DIR)。Rabbitmq 版本是 3.6.10

伪代码

Public class MyTask extends SourceTask implements MqttCallback{
//Initialzied the queue
@Override
    public void start(Map<String, String> map) {
    //set necessary properties and configuration
    //set the required mqttConnect options and mqtt broker
     Paho mClient = new MqttClient(mqttURl,Unique ClientID, new MqttDefaultFilePersistence(DIR))
     mClient.connect(RequiredConnectProperties)
    }
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
    //poll the queue
    // publish messages to kafka
    }
    @Override
    public void stop() {
    //disconnect the mqtt paho clent
    }
    @Override
    public void messageArrived(String mqtttopic, MqttMessage message) throws Exception {
    //add the messages to queue

    }
}

标签: javaapache-kafkamqttpaho

解决方案


推荐阅读