node.js - MQTT 消息事件中的异步函数
问题描述
我在 Node 应用程序中使用 MQTTjs 模块来订阅 MQTT 代理。我希望在收到新消息后,使用异步函数将它们存储在 MongoDB 中。我的代码是这样的:
client.on('message', (topic, payload, packet) => {
(async () => {
await msgMQTT.handleMQTT_messages(topic, payload, process.env.STORAGE,
MongoDBClient)
})
})
但我不明白为什么它不起作用,即它执行异步函数但任何 MongoDB 查询返回而不被执行。显然没有发出错误。
我错过了什么?
解决方案
我修改了以下代码:
client.on('message', (topic, payload, packet) => {
try {
msgMQTT.handleMQTT_messages(topic, payload, process.env.STORAGE,
MongoDBClient, db)
} catch (error) {
console.error(error)
}
})
在哪里:
exports.handleMQTT_messages = (topic, payload, storageType, mongoClient, db) => {
const dateFormat = 'YYYY-MMM-dddd HH:mm:ss'
// topic is in the form
//
const topics = topic.split('/')
// locations info are at second position after splitting by /
const coord = topics[2].split(",")
// build up station object containing GeoJSON + station name
//`Station_long${coord[0]}_lat${coord[1]}`
const stationObj = getStationLocs(coord.toString())
const msg = JSON.parse(payload)
// what follows report/portici/
const current_topic = topics.slice(2).join()
let data_parsed = null
// parse only messages having a 'd' property
if (msg.hasOwnProperty('d')) {
console.log(`${moment().format(dateFormat)} - ${stationObj.name} (topic:${current_topic})\n `)
data_parsed = parseMessages(msg)
// date rounded down to the nearest hour
// https://stackoverflow.com/questions/17691202/round-up-round-down-a-momentjs-moment-to-nearest-minute
dateISO_String = moment(data_parsed.t).startOf('hour').toISOString();
// remove AQ from station name using regex
let station_number = stationObj.name.match(/[^AQ]/).join('')
let data_to_save = {
id: set_custom_id(stationObj.name, dateISO_String),
//`${station_number}${moment(dateISO_String).format('YMDH')}`,
date: dateISO_String,
station: stationObj,
samples: [data_parsed]
}
switch (storageType) {
case 'lowdb':
update_insertData(db, data_to_save, coll_name)
break;
case 'mongodb': // MongoDB Replicaset
(async () => {
updateIoTBucket(data_to_save, mongoClient, db_name, coll_name)
})()
break;
default: //ndjson format
(async () => {
await fsp.appendFile(process.env.PATH_FILE_NDJSON,
JSON.stringify(data_to_save) + '\n')
})()
//saveToFile(JSON.stringify(data_to_save), process.env.PATH_FILE_NDJSON)
break;
}
// show raw messages (not parsed)
const show_raw = true
const enable_console_log = true
if (msg && enable_console_log) {
if (show_raw) {
console.log('----------RAW data--------------')
console.log(JSON.stringify(msg, null, 2))
console.log('--------------------------------')
}
if (show_raw && data_parsed) {
console.log('----------PARSED data-----------')
console.log(JSON.stringify(data_parsed, null, 2))
console.log('--------------------------------')
}
}
}
}
只有updateIoTBucket(data_to_save, mongoClient, db_name, coll_name)
使用 mgongodb 驱动程序异步执行。
推荐阅读
- javascript - 单击空时输入值。有人可以解释为什么吗?
- mysql - 如何使用 Go 获取切片中的值?
- javascript - Discord.js - 向音乐系统添加队列功能
- git - 从 Windows Git bash 终端打开 Pycharm 后保持打开状态
- xamarin - Xamarin Shared Mono Runtime 中的“共享”一词的真正含义是什么?
- node.js - 计算器等待用户输入 (Discord.js)
- reactjs - COTURN + Kurento MediaServer Trickle IceConnectionState 在一个选项卡上“已连接”,在另一个选项卡上“检查”
- angular - 如何在Angular中制作滑块范围日期
- python - groupby 后跟一个函数
- c - 如何确定一个进程是否在容器中运行(在 linux 中)?