首页 > 解决方案 > MQTT 消息事件中的异步函数

问题描述

我在 Node 应用程序中使用 MQTTjs 模块来订阅 MQTT 代理。我希望在收到新消息后,使用异步函数将它们存储在 MongoDB 中。我的代码是这样的:

client.on('message', (topic, payload, packet) => {

      (async () => {
        await msgMQTT.handleMQTT_messages(topic, payload, process.env.STORAGE,
          MongoDBClient)
      })

    })

但我不明白为什么它不起作用,即它执行异步函数但任何 MongoDB 查询返回而不被执行。显然没有发出错误。
我错过了什么?

标签: node.jsmongodbmqtt

解决方案


我修改了以下代码:

client.on('message', (topic, payload, packet) => {
        try {
            msgMQTT.handleMQTT_messages(topic, payload, process.env.STORAGE,
                MongoDBClient, db)
        } catch (error) {
            console.error(error)
        }
    })

在哪里:

exports.handleMQTT_messages = (topic, payload, storageType, mongoClient, db) => {

    const dateFormat = 'YYYY-MMM-dddd HH:mm:ss'


    // topic is in the form 
    // 
    const topics = topic.split('/')

    // locations info are at second position after splitting by /
    const coord = topics[2].split(",")

    // build up station object containing GeoJSON + station name
    //`Station_long${coord[0]}_lat${coord[1]}`
    const stationObj = getStationLocs(coord.toString())
    const msg = JSON.parse(payload)

    // what follows report/portici/
    const current_topic = topics.slice(2).join()
    let data_parsed = null

    // parse only messages having a 'd' property
    if (msg.hasOwnProperty('d')) {
        console.log(`${moment().format(dateFormat)} - ${stationObj.name} (topic:${current_topic})\n `)
        data_parsed = parseMessages(msg)

        // date rounded down to the nearest hour
        // https://stackoverflow.com/questions/17691202/round-up-round-down-a-momentjs-moment-to-nearest-minute
        dateISO_String = moment(data_parsed.t).startOf('hour').toISOString();

        // remove AQ from station name using regex
        let station_number = stationObj.name.match(/[^AQ]/).join('')
        let data_to_save = {
            id: set_custom_id(stationObj.name, dateISO_String),
            //`${station_number}${moment(dateISO_String).format('YMDH')}`,
            date: dateISO_String,
            station: stationObj,
            samples: [data_parsed]
        }
        switch (storageType) {
            case 'lowdb':
                update_insertData(db, data_to_save, coll_name)

                break;

            case 'mongodb': // MongoDB Replicaset
                (async () => {
                    updateIoTBucket(data_to_save, mongoClient, db_name, coll_name)
                })()
                break;


            default: //ndjson format
                (async () => {
                    await fsp.appendFile(process.env.PATH_FILE_NDJSON,
                        JSON.stringify(data_to_save) + '\n')
                })()
                //saveToFile(JSON.stringify(data_to_save), process.env.PATH_FILE_NDJSON)
                break;
        }

        // show raw messages (not parsed)
        const show_raw = true
        const enable_console_log = true
        if (msg && enable_console_log) {
            if (show_raw) {
                console.log('----------RAW data--------------')
                console.log(JSON.stringify(msg, null, 2))
                console.log('--------------------------------')
            }
            if (show_raw && data_parsed) {
                console.log('----------PARSED data-----------')
                console.log(JSON.stringify(data_parsed, null, 2))
                console.log('--------------------------------')
            }

        }
    }

}

只有updateIoTBucket(data_to_save, mongoClient, db_name, coll_name)使用 mgongodb 驱动程序异步执行。


推荐阅读