首页 > 解决方案 > ZeroMQ + NodeJs:订阅者未收到消息

问题描述

我的发布者 ( zpub.js) 在循环中发布,如下所示。

async function publishLoop() {
    let payload = []
    _.forEach(array, (a) => {
       // process a here to generate someKey and someValue
       payload.push({someKey:someValue})
    })

  return Promise.all(payload.map(async (p) => {
    await zmqp.publish({t:'topicString', m:p})
  }))
}

zmqp.publish简直就是以下。

async publish(payload) {
    // this.sock is just a bind to tcp://127.0.0.1:4030
    await this.sock.send([payload.t, JSON.stringify(payload.m, null, null)])
    return Promise.resolve()
}

我的订阅者 ( zsub.js) 是 ZeroMQ 网站上的代码版本。

const zmq = require("zeromq")
const mom = require('moment-timezone')

async function run() {
  const sock = new zmq.Subscriber

  sock.connect("tcp://127.0.0.1:4030")
  sock.subscribe("topicString")

  for await (const [topic, msg] of sock) {
    console.log(`${mom().tz('Asia/Kolkata').format('YYYY-MM-DDTHH:mm:ss.SSS')}`)
  }
}

run()
  1. 我以node zsub.js > out.
  2. 我以node zpub.js. 成功接收所有消息。
  3. zpub.js过程结束但zsub.js继续运行。当我重新运行node zpub.js时,订阅者没有收到一条消息。中的记录数out保持不变。
  4. 再次运行zpub.js一次或两次似乎会将消息(最近的消息;而不是时间戳所看到的较早消息)传递给订阅者。

因此,我不确定在 pub/sub 端要做什么,以免消息“丢失”。请指教。

标签: javascriptnode.jsmessagezeromqpublish-subscribe

解决方案


Pub-Sub 本质上是不可靠的,因为订阅者没有反馈给发布者以确认已收到消息。该指南对此进行了详细描述并提供了一些解决方案。解决方案之一是不使用 Pub-Sub,而是使用 Router-Dealer。这是否是可行的替代方案取决于您的用例。

关于您的具体问题,订阅者最终确定与发布者的连接丢失,并将尝试重新连接,直到重新建立连接。根据时间的不同,订阅者可能会错过发布者发送的初始(或所有)消息。

一般来说,如果发布者是通信的稳定部分(如服务器一样保持在线状态)并且订阅者可以进出(如客户端),则 Pub-Sub 效果最佳。


推荐阅读