首页 > 解决方案 > ZeroMQ 重传 pub-sub 模型

问题描述

我正在尝试使用 ZeroMQ javascript 绑定来实现重新传输的 pub-sub 主干。

大意是:

不过,我遇到了需要从单个线程使用的 pub 套接字的问题。

我当前的代码是这样的:

async function listen(name: string, sub: zmq.Subscriber, pub: zmq.Publisher) {
    let id = 0;
    for await (const [topic, msg] of sub) {
        console.log(`BACKBONE | ${name} | received a message id: ${++id} related to: ${topic.toString()} containing message: ${msg.toString()}`);
        await pub.send([topic, msg]);
    }
}

每个入站模块都会对其进行实例化,但是当然它们会在pub.send.

标签: node.jstypescriptzeromq

解决方案


我写了一个队列来序列化套接字访问,从而解决了这个问题。

import * as zmq from "zeromq"

export class PublisherQueue {
    private queue: Buffer[][] = []
    constructor(private publisher: zmq.Publisher) { }

    private static toBuffer(value: Buffer | string): Buffer {
        if (value instanceof Buffer) {
            return value as Buffer;
        } else {
            return Buffer.from(value as string);
        }
    }

    send(topic: Buffer | string, msg: Buffer | string) {
        this.queue.push([PublisherQueue.toBuffer(topic), PublisherQueue.toBuffer(msg)]);
    }

    async run() {
        while (true) {
            if (this.queue.length > 0) {
                let msg = this.queue.shift();
                if (msg !== undefined) {
                    await this.publisher.send(msg);
                }
            } else {
                await new Promise(resolve => setTimeout(resolve, 50));
            }
        }
    }
}

推荐阅读