首页 > 解决方案 > 使用 MQTT.js 和 Mosquitto 发布和订阅 MQTT 主题

问题描述

我正在尝试使用 mosquitto(在 VM 内sudo apt-get install mosquitto)使用 node.js 和此mqtt.js异步库发布/订阅 MQTT 消息:https ://github.com/mqttjs/async-mqtt

在我的本地 PC 上安装 mosquitto CLI 发布者/订阅者客户端后,sudo apt-get install mosquitto-clients我知道它们可以工作,因为我可以使用以下命令成功监视发布者/订阅者会话:

我可以看到从发布者到订阅者的消息,但是当开始使用 Node.js 发送消息时,我在订阅者 CLI 会话中再也看不到它了

我假设具有不同 ID 的多个发布者可以向同一主题发送消息,并且多个订阅者可以通过来自同一主题的 ID 进行过滤。我认为这是可能的,但以下代码不起作用的部分原因可能是我需要妥善处理 ID/主题组合?

这是mocha我执行以尝试发送读数的规范

const MQTT = require("async-mqtt");

const consoleLogError = (err) => {
  if (err.response) {
    console.error(`${new Date().toISOString()} HTTP response error, ${err.response.status}: ${err.response.statusText}`);
  } else {
    console.error(`${new Date().toISOString()} No HTTP error, the stack: ${new Error(err).stack}`);
  }
};

const consoleLog = (msg) => {
  console.log(`${new Date().toISOString()} ${msg}`);
};

// {"fooMetric":42.42, "created_at":"2018-12-24T10:42:08.057Z"}
const generateReadingMsg = () => {
  const now = new Date();
  const msg = {
    "fooMetric": 42.42,
    "created_at": now.toISOString()
  };
  consoleLog(`New generated reading: ${JSON.stringify(msg)}`);
  return msg;
};

const mqttSession = {};

mqttSession.asyncInit = (hostPort, deviceId, mqttTopic) => {
  return new Promise((resolve, reject) => {
    mqttSession.mqttTopic = mqttTopic;
    mqttSession.client = MQTT.connect(`mqtts://${hostPort}`, {
      keepalive: 10,
      clientId: deviceId,
      protocolId: 'MQTT',
      clean: false,
      protocolVersion: 4,
      reconnectPeriod: 1000,
      connectTimeout: 30 * 1000,
      rejectUnauthorized: false,
    });
    return resolve();
  });
};

mqttSession._send = (msgStr) => {
  return Promise.resolve()
    .then(() => {
      return mqttSession.client.publish(mqttSession.mqttTopic, msgStr);
    })
    .then(() => {
      return mqttSession.client.end();
    })
    .catch((err) => {
      consoleLogError(err);
      throw err;
    });
}

mqttSession.asyncSend = (msgJson) => {
  const msgStr = JSON.stringify(msgJson);
  return Promise.resolve()
    .then(() => {
      mqttSession.client.on("connect", () => {
        return mqttSession._send(msgStr);
      });
    })
    .catch((err) => {
      consoleLogError(err);
      throw err;
    });
};

describe.only('MQTT readings', () => {

  // for the IP address check the VM details
  const vm_ip = "xxx.xxx.xxx.xxx";

  beforeEach(() => {
    return Promise.all([
        mqttSession.asyncInit(`${vm_ip}:1883`, "fooId", "readings")
      ]);
  });

  it('should send a reading to the MQTT broker', () => {
    console.log(`TODO run "mosquitto_sub -h ${vm_ip} -p 1883 -t "readings" -v -d"`);
    console.log(`The following MQTT-send should be equivalent to: "mosquitto_pub -h ${vm_ip} -p 1883 -t "readings" -i foo001 -m '{"deviceId":"foo001","fooMetric":42.42}' -d"`)
    return mqttSession.asyncSend(generateReadingMsg())
      .then(stuff => {
        console.log(`returned stuff from the MQTT session: ${stuff}`);
        return Promise.resolve();
      })
      .catch(error => {
        consoleLogError(error);
        throw error;
      });
  });

});

标签: node.jsmqttmosquitto

解决方案


首先,您无法在 MQTT *协议级别识别哪个客户端在主题上发布了给定消息。该信息在任何协议级别信息中都不存在。如果您需要该信息,则需要将其包含在您发送的消息的有效负载中,并在消息传递后对其进行过滤。

As for the code, you are trying to connect to a secure MQTT broker using mqtts://

mqttSession.client = MQTT.connect(`mqtts://${hostPort}`, {

Where as unless you have specifically configured Mosquitto in your VM it will be running normal unsecured MQTT on port 1883

If you delete the s the code runs fine against my broker.

mqttSession.client = MQTT.connect(`mqtt://${hostPort}`, {

* This MQTT v3.x, with the new MQTT v5.0 spec there is the option to add extra meta data, but again you would not be able to filter at subscription time, only once the message had been delivered.


推荐阅读