node.js - ZeroMQ 重传 pub-sub 模型
问题描述
我正在尝试使用 ZeroMQ javascript 绑定来实现重新传输的 pub-sub 主干。
大意是:
- 入站模块向骨干网宣布自己,骨干网订阅它们并在其自己的发布者套接字上重新传输
- 出站模块订阅骨干网
不过,我遇到了需要从单个线程使用的 pub 套接字的问题。
我当前的代码是这样的:
async function listen(name: string, sub: zmq.Subscriber, pub: zmq.Publisher) {
let id = 0;
for await (const [topic, msg] of sub) {
console.log(`BACKBONE | ${name} | received a message id: ${++id} related to: ${topic.toString()} containing message: ${msg.toString()}`);
await pub.send([topic, msg]);
}
}
每个入站模块都会对其进行实例化,但是当然它们会在pub.send
.
解决方案
我写了一个队列来序列化套接字访问,从而解决了这个问题。
import * as zmq from "zeromq"
export class PublisherQueue {
private queue: Buffer[][] = []
constructor(private publisher: zmq.Publisher) { }
private static toBuffer(value: Buffer | string): Buffer {
if (value instanceof Buffer) {
return value as Buffer;
} else {
return Buffer.from(value as string);
}
}
send(topic: Buffer | string, msg: Buffer | string) {
this.queue.push([PublisherQueue.toBuffer(topic), PublisherQueue.toBuffer(msg)]);
}
async run() {
while (true) {
if (this.queue.length > 0) {
let msg = this.queue.shift();
if (msg !== undefined) {
await this.publisher.send(msg);
}
} else {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
}
}
推荐阅读
- ios - SQFLITE 错误:SqfliteDatabaseException(DatabaseException(database_closed))
- python - 如何使用heroku和aws s3存储桶在django rest框架中上传图像
- laravel - 按多个字段搜索 Laravel 和 Elasticsearch
- python - 如何通过详尽的枚举找到可被4整除且最接近55的数字
- sql - 查询以从具有重复记录的表中获取最小 id
- node.js - 使用 Nodejs 的 AWS S3 存储桶到存储桶同步
- python - IronPython 脚本使用 C# Reference throws ArgumentTypeException expected got the same value
- spring-boot - 升级 Spring Boot 版本时不存在主题替代名称
- android - 如果文件存在于内部存储中,则为文件名添加后缀
- javascript - 替换对象数组中属性的最佳方法