java - 客户端断开连接时的 Paho MQTT 消息可靠性
问题描述
我正在开发一个 MQTT kafka 源连接器。消息来自 MQTT Publisher,然后连接器将订阅 MQTT 主题(/iot/sensor/#),这些消息将发布到 Kafka 代理。但与此同时,如果我停止连接器并再次重新启动连接器,则连接器正在消耗来自 mqtt 代理的消息,但缺少一些 mqtt 消息(如果我从 MQTT 发布者发送 1000 条消息,但在 kafka 中没有收到 1000 条消息) 我给出mqtt ClientID 是唯一名称。我使用 setCleanSession=false,QOS=1 设置 mqtt paho 客户端属性并使用 MqttDefaultFilePersistence(DIR)。Rabbitmq 版本是 3.6.10
伪代码
Public class MyTask extends SourceTask implements MqttCallback{
//Initialzied the queue
@Override
public void start(Map<String, String> map) {
//set necessary properties and configuration
//set the required mqttConnect options and mqtt broker
Paho mClient = new MqttClient(mqttURl,Unique ClientID, new MqttDefaultFilePersistence(DIR))
mClient.connect(RequiredConnectProperties)
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
//poll the queue
// publish messages to kafka
}
@Override
public void stop() {
//disconnect the mqtt paho clent
}
@Override
public void messageArrived(String mqtttopic, MqttMessage message) throws Exception {
//add the messages to queue
}
}
解决方案
推荐阅读
- adobe - 在哪里可以找到 adobe Dreamweaver 测验?
- java - 来自java的Amazon cognito未经身份验证的流
- docker - 使用 docker-compose 运行 Edgex-UI-GO
- eclipse - 插件片段看不到宿主插件中的包
- css - 小心!Object-fit:Cover 在最新的 Chrome 更新中不起作用,是错误还是他们放弃了支持?
- python - 即使在更改之后,Python 脚本语法错误
- python - 自定义虚线 matplotlib
- model - @Model 在当前上下文中不存在
- python - 在 python 中使用 itertools 的组合没有给出正确的组合
- python - Laravel:致命的 Python 错误:_Py_HashRandomization_Init:无法获取随机数来初始化 Python Python 运行时状态:预初始化