首页 > 解决方案 > NodeJS + MQTT - 以 1 毫秒的频率发布会导致 EventEmitter 内存泄漏

问题描述

我想模拟大量消息,所以我在我的index.js. 我正在使用mqtt模块。

var client = mqtt.connect('mqtts://broker.com:8883', {
  ca: [fs.readFileSync('broker.pem')],
  rejectUnauthorized: false
})

let frequency = process.env.FREQUENCY || 1000
client.on('connect', function () {
  let count = 0
  setInterval(() => {
    process.stdout.clearLine()
    process.stdout.cursorTo(0)
    client.publish('/topic/string', JSON.stringify(getPayload()), (err) => {
      if (err) {

      } else {
        ++count
        process.stdout.write('Messages published :' + count)
      }
    })
  }, frequency)
})

当我运行它时export FREQUENCY=1 && node index.js,我得到这个错误。

nsubrahm@nsubrahm simulator % node index.js     
(node:2313) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 1001 drain listeners added. Use emitter.setMaxListeners() to increase limit
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 1001 drain listeners added. Use emitter.setMaxListeners() to increase limit
    at _addListener (events.js:243:17)
    at TLSSocket.addListener (events.js:259:10)
    at TLSSocket.Readable.on (_stream_readable.js:799:35)
    at TLSSocket.once (events.js:290:8)
    at sendPacket (/my/simulator/node_modules/mqtt/lib/client.js:99:19)
    at MqttClient._sendPacket (/my/simulator/node_modules/mqtt/lib/client.js:1061:7)
    at MqttClient.publish (/my/simulator/node_modules/mqtt/lib/client.js:541:14)
    at Timeout.setInterval [as _onTimeout] (/my/simulator/index.js:53:12)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)

我什至尝试运行 5 个实例,index.js每个实例的频率为 5 秒;所以总的来说,我可以看到接近每 1 毫秒的负载行为。同样的错误。

如何模拟 MQTT 以 1ms 频率发布的客户端?

标签: node.jsmqttload-testingpublish-subscribeeventemitter

解决方案


似乎代码不是问题,即使将频率变量设置为 1 它也对我有用,因为我可以在控制台中看到没有警告的消息,也许像 getPayload() 这样的代码的其他部分正在创建很多侦听器并且可能导致警告。

尝试限制可以使用客户端变量创建的侦听器并设置一个简单的文本,client.publish('/topic/string', JSON.stringify(getPayload()),如下所示:

...
var client = mqtt.connect('mqtts://broker.com:8883', {
  ca: [fs.readFileSync('broker.pem')],
  rejectUnauthorized: false
})
/** This will avoid the MaxListenerExceededWarning */
client.setMaxListeners(100)
...

...
// Do not use JSON.stringify(getPayLoad())
client.publish('/topic/string', 'JUST_TYPE_THIS', (err) => { ...
...

这个问题在这里解释

更新

避免内存泄漏警告的代码段

var mqtt = require('mqtt')
// Changed URL
var client  = mqtt.connect('mqtt://test.mosquitto.org')

// IMPORTANT: Limit listeners
client.setMaxListeners(100)

// Changed ms to 1 ms for this example
//let frequency = process.env.FREQUENCY || 1000
let frequency = 1

client.on('connect', function () {
  let count = 0
  setInterval(() => {
    process.stdout.clearLine()
    process.stdout.cursorTo(0)
    // Set harcoded key-value but you need to set
    // real key-value
    client.publish('/topic/string', 'key-value-example', (err) => {
      if (err) {

      } else {
        ++count
        process.stdout.write('Messages published :' + count)
      }
    })
  }, frequency)
})


推荐阅读