首页 > 解决方案 > 如何流式传输到 Promise?

问题描述

当我POST将数据写入我的应用程序时,我将其写入文件并通过PUT. 我想从POST每个流(文件和 PUT)的状态返回。

putResults是一个数组,它是封闭类的一部分,用于保存每个请求的结果。

我如何收集回复?我可以从中返回一系列 Promises,createWriteStreams但我怎么能req.pipe返回它们呢?你可以流式传输到 Promise 吗?

  post(req, res, next) {
    let listeners = this.getWriteStreams();
    let c = listeners.length;
    for (i = 0; i < c; i++) {
      req.pipe(listeners[i]);
    }
    
    /* What do I put here to return after all requests finish? */

  }

  put(req, res, next) {
    var fstream = fs.createWriteStream(this.path);
    req.pipe(fstream);
    req.on('end', () => {
      fstream.close();
      res.status(201).send("OK");
    });
    req.on('error', (err) => {
      res.status(500).send(err);
    });
  }

  createWriteStreams() {
    let listeners = [];
    // We always want to save to a file
    listeners.push(fs.createWriteStream(this.path).on('close', ()=>{
      this.putResults.push({ host: hutil.myHost, status: 201 });
    })); 
    // If there are other servers in current env, send to them, too!
    let otherGuys = hostutil.otherServers();
    if (otherGuys.length > 0) {
      for (i = 0; i < otherGuys.length; i++) {
        let opts = {
          hostname: hutil.fq(otherGuys[i]),
          port: this.port, 
          path: this.endpoint,
          method: 'PUT',
        };
        let req = https.request(opts, res => {
          this.putResults.push({ host: opts.hostname, status: res.statusCode});
        });
        req.on('error', (e) => {
          this.putResults.push({ host: opts.hostname, status: e });
        });

        listeners.push(req);
      }
    }
    return listeners;
  }

标签: node.jsexpresspromisestream

解决方案


好吧,如果有人有这个问题,知识的关键点是可以传递输入流,就好像它上面有多个自定义龙头一样 - 打开一个似乎不会过早地将数据喷洒到你的其他地方'正在放置软管!

因此,由于您无法流式传输到 Promise,您仍然可以流式传输到流,并且您显然可以花时间设置它们。这是我的解决方案:将请求传递给包装在 Promise 中的流。

function post(req, res, next) {
  let promises = this.streamPromises(req);

  Promise.allSettled(promises).then((results) => {
    // Remove the Promise container junk - results come in 2 completely different object structures. Great design, jeez. :-\
    let noContainer = results.map(item => item.value).filter(i => i != undefined);
    noContainer.push(...results.map(item => item.reason).filter(i => i != undefined));

    res.status(200).json(noContainer);
  }).catch(err => {
    log.warn(`POST request for ${this.filename} failed, at least in part: ${err}`)
    res.status(200).json({ host: hutil.myHost, status: err });
  });
}

function put(req, res, next) {
  var fstream = fs.createWriteStream(this.fqFile);
  req.pipe(fstream);
  req.on('end', () => {
    fstream.close();
    log.info(`${req.transID} Saved data to ${this.fqFile} sent by ${req.referer}`);
    res.status(201).send("OK");
  });
  req.on('error', (err) => {
    log.warn(`${req.transID} Error receiving/saving PUT file ${this.fqFile} sent by ${req.referer}`);
    res.status(500).send(err);
  });
}

function streamPromises(postReq) {
  let listeners = [];

  listeners.push(this.streamLocalFrom(postReq));  // add file first

  // Send to other servers in the environment
  let otherGuys = hosts.peerServers();
  if (otherGuys.length > 0) {
    for (i = 0; i < otherGuys.length; i++) {
      let opts = {
        hostname: hosts.fq(otherGuys[i]),
        thatHost: otherGuys[i], // ducked this into the object to avoid parsing fq hostname
        port: appconfig.port, // we are all listening here
        path: this.endpoint,
        method: 'PUT',
        timeout: 1000,
        ca: fs.readFileSync(appconfig.authorityFile)
      };
      let p = new Promise((resolve, reject) => {
        let req = https.request(opts, res => {
          log.info(`${this.filename}: Response from ${opts.hostname}:${opts.port}: ${res.statusCode}`);
          // let hn = opts.hostname.match(/(.*?)\./)[1] || opts.hostname;
          resolve({ host: opts.thatHost, status: res.statusCode });
        });
        req.on('error', (e) => {
          log.warn(`Error piping ${this.filename} to ${opts.hostname}:${opts.port}: ${e}`);
          reject({ host: opts.thatHost, status: e });
        });
        postReq.pipe(req);
      });
      listeners.push(p);
    }
  }
  return listeners;
}

function streamLocalFrom(postReq) {
  return new Promise((resolve, reject) => {
    let fileError = false;
    let fstream = fs.createWriteStream(this.fqFile);
    fstream.on('close', (err) => {
      if (!fileError) {
        log.info(`Saved data to file at ${this.fqFile}`);
        resolve({ host: me, status: 201 });
      }
    });
    fstream.on('error', (err) => {
      log.warn(`Could not save ${this.fqFile} because ${err}`);
      reject({ host: hutil.myHost, status: 500 });
      fileError = true;
      fstream.close();
    });
    postReq.pipe(fstream);
  });
}

推荐阅读