首页 > 解决方案 > 保持对管道操作员之间变量的访问

问题描述

我一直在尝试在节点应用程序中使用 Rxjs。fileList$fs.readdirsync(字符串数组)的返回值。

第一个map()有一个名为文件名的参数。

flatMap() readFileAsObservable()用于bindNodeCallback(fs.readFile)读取文件。

我的课Testian需要 2 个参数;yaml-js通过读取文件和filename从第一个映射创建的对象。如何filename在我指示的管道中访问?

fileList$
    .pipe(
        map((filename: string) => `${resolvedDirPath}/${filename}`),
        flatMap(
            (filePath: string) => readFileAsObservable(filePath, 'utf8') as Observable<string>
        ),
        map((fileData: string) => yaml.safeLoad(fileData) as ITestYaml),
        map((testYaml: ITestYaml) => new Testian(testYaml, [I want to use filename here])),
        flatMap((testYaml: Testian) => {
            const prom: Promise<{}> = activeTests.set(testYaml);
            outgoing.sendTest(testYaml);
            return from(prom);
        })
    )

标签: node.jsrxjs6

解决方案


这在任何涉及链式函数的 API 中都被类似地处理,例如 Promise。

临时变量

临时变量可用于存储超出应访问它的函数范围的值。这是一个简单但非惯用的解决方法:

let filename;

fileList$.pipe(
    map((_filename) => {
      filename = _filename;
      return `${resolvedDirPath}/${filename}`;
    }),
    flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
    map((fileData) => yaml.safeLoad(fileData)),
    map((testYaml) => new Testian(testYaml, filename)),
    flatMap((testYaml) => {
        const prom = activeTests.set(testYaml);
        outgoing.sendTest(testYaml);
        return from(prom);
    })
)

竞争条件可能存在问题,具体取决于特定的 observable。

嵌套函数

filename可以嵌套使用的函数以从父范围访问变量:

fileList$.pipe(
    flatMap((filename) => of(`${resolvedDirPath}/${filename}`).pipe(
        flatMap((filePath) => readFileAsObservable(filePath, 'utf8')),
        map((fileData) => yaml.safeLoad(fileData)),
        map((testYaml) => new Testian(testYaml, filename)
    ),
    flatMap((testYaml) => {
        const prom = activeTests.set(testYaml);
        outgoing.sendTest(testYaml);
        return from(prom);
    })
)

传递价值

在可能的情况下,该变量可以与其他其他结果一起传递:

fileList$.pipe(
    map((filename) => [filename, `${resolvedDirPath}/${filename}`]),
    flatMap(
        ([filename, filePath]) => forkJoin(filename, readFileAsObservable(filePath, 'utf8')),
    ),
    map(([filename, fileData]) => [filename, yaml.safeLoad(fileData) as ITestYaml)],
    map(([filename, testYaml]) => new Testian(testYaml, filename)),
    flatMap((testYaml) => {
        const prom = activeTests.set(testYaml);
        outgoing.sendTest(testYaml);
        return from(prom);
    })
)

异步..等待

如果流允许切换到 promise 和async..await,则可以这样做,因为函数作用域的问题在函数中不存在async

fileList$.pipe(
    flatMap(async (filename) => {
      const filePath = `${resolvedDirPath}/${filename}`;
      const fileData = await readFileAsObservable(filePath, 'utf8').toPromise();
      let testYaml = yaml.safeLoad(fileData);
      testYaml = new Testian(testYaml, filename);
      const prom = activeTests.set(testYaml);
      outgoing.sendTest(testYaml);
      return prom;
    })
)

由于这个 observable 已经使用flatMap和承诺,它可以单独使用承诺安全地编写。RxJS observables 有一些不适合 promise 的用例,但这不是其中之一。


推荐阅读