首页 > 解决方案 > rxjs firstValueFrom 从不解析

问题描述

我正在尝试在 RxJs 上创建类似于事件循环的东西,我将firstValueFrom其用作门等待所有事件都被处理,然后再进一步。目标是让 nodejs 服务运行并对各种事件做出反应,处理这些事件并能够在收到命令时优雅地关闭。

我可以看到我无法向自己解释的行为 - 当可以满足退出条件时-一切都按预期工作:事件由发行者发出并由处理程序处理。

但是,当我消除退出事件出现的可能性时 - 代码在rx.firstValueFrom调用后立即退出。

编码:

import * as rx from "rxjs";
import * as op from "rxjs/operators";

async function foo(): Promise<string> {
    console.log("1");
    const s = new rx.ReplaySubject<string>();
    const t = rx.timer(1000)
        .pipe(
            op.take(3),
            op.map(x => x.toString()),
            op.endWith("exit"),
        );

    const exitObserver = s.asObservable()
        .pipe(
            op.mergeWith(t),
            op.filter(x => x === "exit")
        );
    console.log("2");
    const firstValue = await rx.firstValueFrom(exitObserver);
    console.log("3");
    return firstValue;
}

foo()
    .then(x => console.log(`result: ${x}`))
    .catch(e => console.error(e))
    .finally(() => console.log('finally'))

输出:

1
2
3
result: exit
finally

这是预期的。

具有预期无限循环的更改代码(“退出”事件被注释掉):

import * as rx from "rxjs";
import * as op from "rxjs/operators";

async function foo(): Promise<string> {
    console.log("1");
    const s = new rx.ReplaySubject<string>();
    const t = rx.timer(1000)
        .pipe(
            op.take(3),
            op.map(x => x.toString()),
            //op.endWith("exit"),
        );

    const exitObserver = s.asObservable()
        .pipe(
            op.mergeWith(t),
            op.filter(x => x === "exit")
        );
    console.log("2");
    const firstValue = await rx.firstValueFrom(exitObserver);
    console.log("3");
    return firstValue;
}

foo()
    .then(x => console.log(`result: ${x}`))
    .catch(e => console.error(e))
    .finally(() => console.log('finally'))

输出:

1
2

这不是预期的。我希望这段代码可以无限期地等待“退出事件”。没有错误消息。我使用打字稿 4.3.5、节点 v14.15.4、RxJs 7.4.0。

我的问题是

  1. 为什么更改后的代码不会进入无限循环等待不存在的消息?
  2. 如何使用 RxJs 创建无限循环?

标签: node.jstypescriptrxjs

解决方案


  1. 它无法将 3 写入输出,因为它仍在等待来自 exitObservable 的第一个值。你在那里有一个过滤器,因此它永远不会发生。无限循环的术语在 RxJS 世界中可能具有误导性。

  2. 你可以用它takeUntil来实现你的目标。

const {Subject} = rxjs;
const {filter, takeUntil} = rxjs.operators;

const actions$ = new Subject();

actions$
  .pipe(
    filter(action => action !== 'exit'),
    takeUntil(actions$.pipe(
      filter(action => action === 'exit')
    ))
  )
  .subscribe({
    next: action => console.log(`result: ${action}`),
    error: error => console.error(e),
    complete: () => console.log('exit'),
  });
  
actions$.next('first');
actions$.next('second');
actions$.next('exit');
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>


推荐阅读