首页 > 解决方案 > 如何在nodejs中使用事件和管道创建函数

问题描述

我正在用打字稿写一个nodejs库,这个库的主要范围是从给定的url下载东西,我想做的是像这样使用它

import library from 'library'

library('https://www.example.com')
    .on('progress', (progress: progress) => {
        //do something with the progress
    })
    .on('end', () => {
        //do something when done
    })
    .pipe(fs.createWriteStream('./test/file.mp4'))

我从来没有以这种方式处理节点事件和流,所以我什至不知道如何去做我正在使用打字稿和 webpack 也请原谅糟糕的英语

标签: javascriptnode.jstypescripteventsnodejs-stream

解决方案


您将需要实现一个可读流。节点流是EventEmitter的实例,因此您可以自动访问事件 API。

至少,您需要实现一个_read()在消费者准备好从队列接收更多数据时在内部调用的方法。由于您希望库报告进度,因此您还需要跟踪已处理的数据量并相应地发出事件。

下面的代码忽略了一些重要的事情,比如背压,但这是一个开始。我使用node-fetch作为请求库,因为它公开了底层响应流并且相当容易使用。

// fileLoader.js
const {Readable} = require('stream')
const fetch = require('node-fetch')

class FileLoader extends Readable {
  constructor(url) {
    super()
    this._url = url
    this._fetchStarted = false
    this._totalLength = 0
    this._currentLength = 0
  }

  _processData(stream) {
    stream
      .on('end', () => {
        this.push(null)
      })
      .on('error', (err) => {
        this.destroy(err)
      })
      .on('data', (chunk) => {
        this._currentLength += chunk.length
        if (this._totalLength) {
          this.emit('progress', Math.round(this._currentLength / this._totalLength * 100))
        }
        this.push(chunk)
      })
  }

  _startFetch() {
    fetch(this._url)
      .then((res) => {
        if (!res.ok) {
          return this.destroy(new Error(`fetch resulted in ${res.status}`))
        }
        this._totalLength = res.headers.get('content-length')
        this._processData(res.body)
      })
      .catch((err) => {
        return this.destroy(new Error(err))
      })
  }

  _read() {
    if (!this._fetchStarted) {
      this._fetchStarted = true
      this._startFetch()
    }
  }
}

module.exports.loadFile = (url) => new FileLoader(url)

和消费者代码:

// consumer.js
const fs = require('fs')
const {loadFile} = require('./fileLoader')

loadFile('http://example.com/video.mp4')
  .on('progress', (progress) => {
    console.log(`${progress}%`)
  })
  .on('end', () => {
    console.log('done')
  })
  .on('error', (err) => {
    console.log(err)
  })
  .pipe(fs.createWriteStream('./tempy.mp4'))

推荐阅读