node.js - rxjs firstValueFrom 从不解析
问题描述
我正在尝试在 RxJs 上创建类似于事件循环的东西,我将firstValueFrom
其用作门等待所有事件都被处理,然后再进一步。目标是让 nodejs 服务运行并对各种事件做出反应,处理这些事件并能够在收到命令时优雅地关闭。
我可以看到我无法向自己解释的行为 - 当可以满足退出条件时-一切都按预期工作:事件由发行者发出并由处理程序处理。
但是,当我消除退出事件出现的可能性时 - 代码在rx.firstValueFrom
调用后立即退出。
编码:
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function foo(): Promise<string> {
console.log("1");
const s = new rx.ReplaySubject<string>();
const t = rx.timer(1000)
.pipe(
op.take(3),
op.map(x => x.toString()),
op.endWith("exit"),
);
const exitObserver = s.asObservable()
.pipe(
op.mergeWith(t),
op.filter(x => x === "exit")
);
console.log("2");
const firstValue = await rx.firstValueFrom(exitObserver);
console.log("3");
return firstValue;
}
foo()
.then(x => console.log(`result: ${x}`))
.catch(e => console.error(e))
.finally(() => console.log('finally'))
输出:
1
2
3
result: exit
finally
这是预期的。
具有预期无限循环的更改代码(“退出”事件被注释掉):
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function foo(): Promise<string> {
console.log("1");
const s = new rx.ReplaySubject<string>();
const t = rx.timer(1000)
.pipe(
op.take(3),
op.map(x => x.toString()),
//op.endWith("exit"),
);
const exitObserver = s.asObservable()
.pipe(
op.mergeWith(t),
op.filter(x => x === "exit")
);
console.log("2");
const firstValue = await rx.firstValueFrom(exitObserver);
console.log("3");
return firstValue;
}
foo()
.then(x => console.log(`result: ${x}`))
.catch(e => console.error(e))
.finally(() => console.log('finally'))
输出:
1
2
这不是预期的。我希望这段代码可以无限期地等待“退出事件”。没有错误消息。我使用打字稿 4.3.5、节点 v14.15.4、RxJs 7.4.0。
我的问题是:
- 为什么更改后的代码不会进入无限循环等待不存在的消息?
- 如何使用 RxJs 创建无限循环?
解决方案
它无法将 3 写入输出,因为它仍在等待来自 exitObservable 的第一个值。你在那里有一个过滤器,因此它永远不会发生。无限循环的术语在 RxJS 世界中可能具有误导性。
你可以用它
takeUntil
来实现你的目标。
const {Subject} = rxjs;
const {filter, takeUntil} = rxjs.operators;
const actions$ = new Subject();
actions$
.pipe(
filter(action => action !== 'exit'),
takeUntil(actions$.pipe(
filter(action => action === 'exit')
))
)
.subscribe({
next: action => console.log(`result: ${action}`),
error: error => console.error(e),
complete: () => console.log('exit'),
});
actions$.next('first');
actions$.next('second');
actions$.next('exit');
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
推荐阅读
- javascript - Javascript 页面加载事件
- nlp - 如何更新 GloVe 模型?
- command-line-interface - dbt cli run - 仅项目前或后挂钩?
- html - How to change WKWebView text color?
- database - Prevent denormalization
- java - 仅为响应更改杰克逊映射的 JSON 对象
- logging - 有没有办法通过缩进折叠文本但在某一列开始缩进检查?
- javascript - 如何使用 scriptElement.innerHTML = JSON.stringify 创建具有函数名称的脚本?
- pandas - Using clicks and keys for different events on Matplotlib
- r - 将 xgb.DMatrix 转换为数据框或矩阵