首页 > 解决方案 > 如何处理来自 fs readline.Interface 异步迭代器的错误

问题描述

基于processLineByLine()的示例,我注意到如果给定的文件名不存在,我们将无法捕获错误。在这种情况下,程序以如下方式结束:

UnhandledPromiseRejectionWarning:错误:ENOENT:没有这样的文件或目录

因此,我提出可捕获错误的最简单方法是对函数进行 2 处修改processLineByLine()

  1. 将其放入发电机中,例如function*
  2. await文件存在检查await access(filename, fs.constants.F_OK)

最后我不得不将readline.Interface实例转换为异步生成器。我特别不喜欢这最后一部分。结果lines()函数如下:

export async function* lines(filename) {
    await access(filename, fs.constants.F_OK)
    const lines = readline.createInterface({
        input: fs.createReadStream(filename),
        crlfDelay: Infinity
    })
    for await (const l of lines) {
        yield l
    }
}

问题lines():如果文件名不存在,是否有更好的方法可以返回异步迭代器或抛出错误?

BUG 报告:关于 @jfriend00 的观察,我在 nodejs 上打开了一个 Bug 问题:https ://github.com/nodejs/node/issues/30831

标签: javascriptnode.jsasync-iterator

解决方案


嗯,这是一个棘手的问题。即使检测文件是否作为 pre-flight 存在也不能保证您可以成功打开它(它可能被锁定或存在权限问题),并且在打开之前检测它是否存在是服务器开发中的经典竞争条件(小窗口,但仍然是比赛条件)。

我仍然认为必须有更好的方法从 a 中获取错误fs.createReadStream(),但我能找到的唯一方法是将其包装在仅在文件成功打开时才解决的 promise 中。这使您可以从打开文件中获取错误并将其传播回async函数的调用者。这就是它的样子:

const fs = require('fs');
const readline = require('readline');

function createReadStreamSafe(filename, options) {
    return new Promise((resolve, reject) => {
        const fileStream = fs.createReadStream(filename, options);
        fileStream.on('error', reject).on('open', () => {
            resolve(filestream);
        });

    });
}

async function processLineByLine(f) {
  const fileStream = await createReadStreamSafe(f);

  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity
  });

  for await (const line of rl) {
    // Each line in input.txt will be successively available here as `line`.
    console.log(`Line from file: ${line}`);
  }
}

processLineByLine("nofile").catch(err => {
    console.log("caught error");
});

这使得processLineByLine()返回的承诺将被拒绝,您可以在那里处理我认为您所要求的错误。如果我误解了您的要求,请澄清。

仅供参考,在我看来,这似乎是一个错误,readline.createInterface()因为它似乎应该在第一次迭代时拒绝for await (const line of rl),但这似乎不是发生的事情。

因此,即使是这种变通方法也不会在流打开后检测到流上的读取错误。这确实需要在createInterface(). 我同意文件打开错误或读取错误都应显示为拒绝for await (const line of rl)


文件打开问题的另一种解决方法是使用预打开文件await fs.promises.open(...)并将其传递fdfs.createReadStream,然后您自己会在打开时看到错误。


不同的解决方案 - 包装 readLine 迭代器以添加错误处理

警告,这最终看起来有点像 hack,但这是一个非常有趣的学习项目,因为我最终不得不asyncIterator用我自己的 readline 包装,以便在我检测到错误时拒绝(库readStream的错误处理readline不见了)。

我开始着手研究如何编写一个processLineByLine()函数,该函数将返回一个asyncIterator正确拒绝流错误的函数(即使readline代码在这方面存在错误),同时仍在内部使用 readline 库。

目标是能够编写如下代码:

for await (let line of processLineByLine("somefile1.txt")) {
     console.log(line);
 }

正确处理内部使用的 readStream 上的错误,无论文件不存在,存在但无法打开,甚至稍后在读取时遇到读取错误。由于我没有在内部更改/修复 readline 接口代码,因此我必须error在 readStream 上安装我自己的侦听器,当我在那里看到错误时,我需要导致 readline 接口的任何未决或未来的承诺被拒绝。

这就是我最终得到的结果:

// This is an experiment to wrap the lines asyncIterator with our own iterator
// so we can reject when there's been an error on the readStream.  It's really
// ugly, but does work.

const fs = require('fs');
const readline = require('readline');

function processLineByLine(filename, options = {}) {
    const fileStream = fs.createReadStream(filename, options);
    let latchedError = null;
    let kill = new Set();

    fileStream.on('error', (err) => {
        latchedError = err;
        // any open promises waiting on this stream, need to get rejected now
        for (let fn of kill) {
            fn(err);
        }
    });

    const lines = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity
    });

    // create our own little asyncIterator that wraps the lines asyncIterator
    //   so we can reject when we need to
    function asyncIterator() {
        const linesIterator = lines[Symbol.asyncIterator]();
        return {
            next: function() {
                if (latchedError) {
                    return Promise.reject(latchedError);
                } else {
                    return new Promise((resolve, reject) => {
                        // save reject handlers in higher scope so they can be called 
                        // from the stream error handler
                        kill.add(reject);

                        let p = linesIterator.next();

                        // have our higher level promise track the iterator promise
                        // except when we reject it from the outside upon stream error
                        p.then((data => {
                            // since we're resolving now, let's removing our reject
                            // handler from the kill storage.  This will allow this scope
                            // to be properly garbage collected
                            kill.delete(reject);
                            resolve(data);
                        }), reject);
                    });
                }
            }
        }
    }

    var asyncIterable = {
        [Symbol.asyncIterator]: asyncIterator
    };

    return asyncIterable;
}

async function runIt() {
    for await (let line of processLineByLine("xfile1.txt")) {
         console.log(line);
     }
 }

runIt().then(() => {
    console.log("done");
}).catch(err => {
    console.log("final Error", err);
});

关于这是如何工作的一些解释......

我们自己对流的错误监控

首先,您可以看到:

    fileStream.on('error', (err) => {
        latchedError = err;
        // any open promises waiting on this stream, need to get rejected now
        for (let fn of kill) {
            fn(err);
        }
    });

这是我们自己对 readStream 的错误监控,以弥补 readline 内部缺少的错误处理。每当我们看到错误时,我们将其保存在更高范围的变量中以供以后使用)。

文件打开错误没有特殊处理

这里的部分目标是摆脱先前解决方案中针对文件打开错误的特殊处理。我们希望 readStream 上的任何错误都触发对 asyncIterable 的拒绝,因此这是一种更通用的机制。文件打开错误在此错误处理中被捕获,就像任何其他读取错误一样。

我们自己的 asyncIterable 和 asyncIterator

调用readline.createInterace()返回一个 asyncIterable。它与常规可迭代对象基本相同,因为您在其上调用特殊属性来获取asyncIterator. 它asyncIterator有一个.next()属性,就像一个常规的迭代器,除了当asyncIterator.next()被调用时,它返回一个解析为对象而不是对象的承诺。

所以,这就是for await (let line of lines)工作方式。它首先调用lines[Symbol.asyncIterator]()以获取 asyncIterator。然后,在asyncIterator它返回时,它会反复await asyncIterator.next()等待返回的承诺asyncIterator.next()

现在,readline.createInterface()已经返回了这样一个asyncIterable. 但是,它并不完全正确。当readStream发生错误时,它不会拒绝.next()每次迭代返回的承诺。事实上,这个承诺永远不会被拒绝或解决。所以,事情就停滞不前了。在我的测试应用程序中,应用程序将退出,因为 readStream 已完成(在错误之后)并且不再有任何东西阻止应用程序退出,即使承诺仍处于未决状态。

因此,我需要一种方法来强制拒绝readlineIterator.next()之前返回且当前正在等待的承诺。for await (...)好吧,promise 没有提供拒绝它的外部接口,我们也无权访问readline可以拒绝它的实现的内部。

我的解决方案是用我自己的作为代理包装 readlineIterator。然后,我们自己的错误检测器发现了一个错误,并且 readline 中有未完成的承诺,我可以使用我的代理/包装器来强制拒绝那些未完成的承诺。这将导致for await (...)看到拒绝并得到正确的错误。而且,它有效。

我花了一段时间才充分了解如何asyncIterators包装一个。我非常感谢这篇JavaScript 中的异步迭代器文章,它提供了一些非常有用的代码示例,用于构建您自己的 asyncIterable 和 asyncIterator。这实际上是本练习中真正学习的地方,其他人可以通过理解上面代码中的工作原理来学习。

强制一个包装好的承诺拒绝

这段代码中的“丑陋”来自于强制一个 Promise 从该 Promise 的拒绝处理程序的通常范围之外拒绝。这是通过将拒绝处理程序存储在更高级别的范围内来完成的,在该范围内,readStream可以调用的错误处理会触发该承诺拒绝。可能有一种更优雅的编码方式,但这有效。

使我们自己的 asyncIterable

异步迭代只是一个对象,它有一个名为 的属性[Symbol.asyncIterator]。该属性必须是一个函数,当不带参数调用时,返回一个asyncIterator. 所以,这里是我们的asyncIterable.

var asyncIterable = {
    [Symbol.asyncIterator]: asyncIterator
};

制作我们自己的 asyncIterator

AnasyncIterator是一个函数,它在调用时会返回一个带有next()属性的对象。每次obj.next()调用时,它都会返回一个解析为通常的迭代器元组对象的承诺{done, value}。我们不必担心解析的值,因为我们只需从 readline 的迭代器中获取它。所以,这是我们的asyncIterator

// create our own little asyncIterator that wraps the lines asyncIterator
//   so we can reject when we need to
function asyncIterator() {
    const linesIterator = lines[Symbol.asyncIterator]();
    return {
        next: function() {
            if (latchedError) {
                return Promise.reject(latchedError);
            } else {
                return new Promise((resolve, reject) => {
                    // save reject handlers in higher scope so they can be called 
                    // from the stream error handler
                    kill.push(reject);

                    let p = linesIterator.next();

                    // have our higher level promise track the iterator promise
                    // except when we reject it from the outside upon stream error
                    p.then(resolve, reject);
                });
            }
        }
    }
}

首先,它从 readline 接口(我们正在代理/包装的接口)获取 asyncIterator 并将其存储在本地范围内,以便我们以后可以使用它。

然后,它返回表单的强制迭代器结构{next: fn}。然后,在该函数内部是我们的包装逻辑展开的地方。如果我们已经看到之前的锁存错误,那么我们总是返回Promise.reject(latchedError);。如果没有错误,那么我们返回一个手动构造的 Promise。

在该 Promise 的 executor 函数中,我们通过将其添加到更高范围的Setnamed中来注册我们的拒绝处理kill。这允许我们更高范围的filestream.on('error', ....)处理程序在通过调用该函数看到错误时拒绝该承诺。

然后,我们调用linesIterator.next()以获取它返回的承诺。我们对该承诺的解析和拒绝回调都感兴趣。如果该承诺被正确解决,我们从更高级别的范围中删除我们的拒绝处理程序(以便更好地对我们的范围进行垃圾收集),然后使用相同的解决值解决我们的包装/代理承诺。

如果 linesIterator 承诺拒绝,我们只需通过我们的 wrap/proxy 承诺传递拒绝。

我们自己的文件流错误处理

所以,现在是最后的解释。我们有这个错误处理程序监视流:

fileStream.on('error', (err) => {
    latchedError = err;
    // any open promises waiting on this stream, need to get rejected now
    for (let fn of kill) {
        fn(err);
    }
});

这有两件事。首先,它存储/锁定错误,因此以后对行迭代器的任何调用都将拒绝之前的错误。其次,如果行迭代器有任何待解决的承诺等待解决,它会循环通过killSet 并拒绝这些承诺。这就是让 asyncIterator 被正确拒绝的原因。这应该发生在readline代码内部,但由于它没有正确执行,我们强制我们的 wrap/proxy 承诺拒绝,以便调用者在流出现错误时看到正确的拒绝。


最后,您可以这样做,因为所有丑陋的细节都隐藏在 Wrapped 后面asyncIterable

async function runIt() {
    for await (let line of processLineByLine("xfile1.txt")) {
         console.log(line);
     }
 }

runIt().then(() => {
    console.log("done");
}).catch(err => {
    console.log("final Error", err);
});

推荐阅读