首页 > 解决方案 > 合并 rxjs 流,检测变化并返回一个值

问题描述

背景 - Angular + .net Core WebApi,假设我正在构建一个时间管理应用程序来跟踪您在任务上花费了多少时间 -> 用户在前端创建任务并启动它 -> 有一个计时器显示经过的时间。

这个想法 - 经过的时间来自后端。当用户通过服务在前端启动任务时,我在后端启动一个计时器并将经过的时间传递回前端,以便我可以将其显示给用户。即使后端和前端之间存在连接问题,我也想显示正确的值。

图:后端流每秒发出一次值(我向用户显示的经过时间),但存在连接故障,6 秒后它冻结了片刻,然后发送 9(“0:09”),这可能会使用户感到困惑(“是 6 秒,现在是 9 秒??”)。所以我在每秒发出新值的前端创建和间隔。每一秒我都想检查后端流是否发送了新值,如果没有,我想获取以前的值并修改它,以便用户获得正确的值。

bStream => ---1---2---3---4---5---6---x---x---9
fStream => ---1---2---3---4---5---6---7---8---9

What user sees:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 (freeze) -> 00:09

What I want to user to see:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> (freeze - frontend knows there is no new value so adds 1 second to previous)
So, it should look like that:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> 00:07 -> 00:08 -> 00:09

我正在努力从哪里开始。

快速摆弄了两个流,但不知道如何找到它 bStream 没有发出新的价值。

https://stackblitz.com/edit/typescript-yookbj

标签: javascripttypescript.net-corerxjsngrx

解决方案


请检查下面的代码。为了模拟同步问题,将代码更改v * 5v * 4,然后计数器一旦得到它就会尊重来自“后端”的值。

// emit every 5s
const source = interval(5000).pipe(
    map(v1 => v1 + 1), // <- only for the example. starting counting from 1.
    startWith(0), // <- only for the example. instant emit of 0.

    map(v => v * 5), // <- every 5 seconds we get right passed amount. so it emits 0 5 10 15.
);
// emit every 1s
const secondSource = interval(1000).pipe(
    delay(50), // <- we want it to be a bit later than original counter.
    map(v1 => v1 + 1), // emits are 1 2 3, not from 0.
    startWith(0), // instant emit of 0.
);

source.pipe( // <- it is our slow backend (it gives us an update every 5 seconds.
    switchMap(v => secondSource.pipe( // if no emit from parent stream - then in 1.05 we get value from this one.
        map(v1 => v + v1), // - adding offset from the second stream to the parent stream.
    )),
).subscribe(v => console.log(v));

现在它从 0 到 N 平稳计数,即使后端有滞后。

更新

还有更简单的方法,但问题是它不依赖后端回答的时间并且有自己的 1 秒周期。

pipe(
  bufferTime(1000), // <- collects for a second.
  scan((a, b) => b.length ? a + 1 : b[0], 0), // assumes or returns.
);

推荐阅读