首页 > 解决方案 > stream.Transform 项目的延迟处理

问题描述

假设我们有一个简单的 node.js 转换流:

export class JSONParser extends stream.Transform {

  constructor() {
    super({objectMode: true});
  }
}

我想同步处理项目一段时间,然后延迟其余的。像这样的东西:

export class JSONParser extends stream.Transform {

  count = 0;

  constructor() {
    super({objectMode: true});
  }

  _transform(chunk, encoding, cb) {

   const modifiedChunk = this.modify(chunk);

   if(count++ % 55 === 0){
     process.nextTick(() => this.push(modifiedChunk));
     return;
   }

     this.push(modifiedChunk);
  }
}

从理论上讲,这意味着对于每 55 个左右的项目,流将等待下一个滴答声来处理剩余的项目。问题 -

  1. 这确实会延迟所有剩余项目的过程,还是只是这一块?它会保留被推送的块的顺序吗?

  2. 我相信令牌桶算法可以进行速率限制,也许这是实现非事件循环阻塞流的更好方法?

标签: node.jsnodejs-stream

解决方案


我将尝试回答您的问题并解释原因:

  1. 这确实会延迟所有剩余项目的过程,还是只是这一块?它会保留被推送的块的顺序吗?

是的,它会,但你需要做一个小的更正。_transform但是,在调用push 之前,转换流不会调用该方法cb- 请注意您实际上根本没有调用该方法cb。您应该在流可以处理下一个块之后执行此操作:

  _transform(chunk, encoding, cb) {

   const modifiedChunk = this.modify(chunk);

    if(count++ % 55 === 0){
      process.nextTick(() => {
        this.push(modifiedChunk); 
        cb()
      });
      return;
    }

    this.push(modifiedChunk);
    cb();

  }
  1. 我相信令牌桶算法可以做速率限制,也许这是实现非事件循环阻塞流的更好方法?

您编写的算法似乎没有进行实际的速率限制 - 至少在chunks per second. 它只是将一些处理推迟到每个这个数量的块的下一个滴答。

令牌桶将是一个很好的解决方案,您甚至可以PassThrough使用如下转换方法创建一个简单的流:

new PassThrough({transform(...args) {
   // some logic to see if you can push out some chunks
   // if so:
   return super._transform(...args)
   // otherwise
   return bucket.ready().then(() => super._transform(...args));
}   

如果您需要一些想法,这是我实施的速率限制示例scramjet。它的操作类似于令牌桶,但它是基于时间的,而不是基于桶大小的——尽管我认为它与相同的结果有关。


推荐阅读