node.js - stream.Transform 项目的延迟处理
问题描述
假设我们有一个简单的 node.js 转换流:
export class JSONParser extends stream.Transform {
constructor() {
super({objectMode: true});
}
}
我想同步处理项目一段时间,然后延迟其余的。像这样的东西:
export class JSONParser extends stream.Transform {
count = 0;
constructor() {
super({objectMode: true});
}
_transform(chunk, encoding, cb) {
const modifiedChunk = this.modify(chunk);
if(count++ % 55 === 0){
process.nextTick(() => this.push(modifiedChunk));
return;
}
this.push(modifiedChunk);
}
}
从理论上讲,这意味着对于每 55 个左右的项目,流将等待下一个滴答声来处理剩余的项目。问题 -
这确实会延迟所有剩余项目的过程,还是只是这一块?它会保留被推送的块的顺序吗?
我相信令牌桶算法可以进行速率限制,也许这是实现非事件循环阻塞流的更好方法?
解决方案
我将尝试回答您的问题并解释原因:
- 这确实会延迟所有剩余项目的过程,还是只是这一块?它会保留被推送的块的顺序吗?
是的,它会,但你需要做一个小的更正。_transform
但是,在调用push 之前,转换流不会调用该方法cb
- 请注意您实际上根本没有调用该方法cb
。您应该在流可以处理下一个块之后执行此操作:
_transform(chunk, encoding, cb) {
const modifiedChunk = this.modify(chunk);
if(count++ % 55 === 0){
process.nextTick(() => {
this.push(modifiedChunk);
cb()
});
return;
}
this.push(modifiedChunk);
cb();
}
- 我相信令牌桶算法可以做速率限制,也许这是实现非事件循环阻塞流的更好方法?
您编写的算法似乎没有进行实际的速率限制 - 至少在chunks per second
. 它只是将一些处理推迟到每个这个数量的块的下一个滴答。
令牌桶将是一个很好的解决方案,您甚至可以PassThrough
使用如下转换方法创建一个简单的流:
new PassThrough({transform(...args) {
// some logic to see if you can push out some chunks
// if so:
return super._transform(...args)
// otherwise
return bucket.ready().then(() => super._transform(...args));
}
如果您需要一些想法,这是我实施的速率限制示例scramjet
。它的操作类似于令牌桶,但它是基于时间的,而不是基于桶大小的——尽管我认为它与相同的结果有关。
推荐阅读
- elasticsearch - 范围过滤器,用于对字段具有相同值的文档计数
- c - 如何读取文件中的特定字符?
- r - 如何修复“符号”类型对象的错误代码不是子集
- python - MicroPython uasyncio 在微控制器和常规 Python 上的行为不同:任务只是保持运行
- jquery - MVC 中的 JQuery Ajax,将表单值传递给 Controller 方法。所有值都是空的,我怎样才能通过 jquery 正确地将表单值传递给控制器?
- kotlin - 为什么 RestTemplate 似乎增加了响应 ID?
- ruby-on-rails - 如何在不在 Rails 中编写新控制器的情况下生成用于测试的任意 flash 消息?
- git - 在 git 配置文件中定义变量?
- java - AbstractBorder 在 JTabbedPane 上绘制
- c++ - C++ 入门第 5 版:未命名枚举创建对象