首页 > 解决方案 > 在 RXJS 中执行“有条件的压缩”更简单/更惯用的方法?

问题描述

我正在尝试在 RXJS 中压缩两个 Observable,当满足某个条件时,成对地从每个 Observable 中获取值。我找到了一种无需使用即可完成此操作的方法zip,但我想知道是否有人知道更惯用的方法。

const { zip, merge, Subject } = require("rxjs");
const rxop = require("rxjs/operators");

const a$ = new Subject();
const b$ = new Subject();

// Zipping a$ and b$ on conditional a.n === b.n
const zippedWithCondition$ = merge(
  a$.pipe(
    rxop.mergeMap(aMsg => b$.pipe(
      rxop.find(bMsg => aMsg.n === bMsg.n),
      rxop.map(bMsg => [aMsg, bMsg])
    ))
  ),
  b$.pipe(
    rxop.mergeMap(bMsg => a$.pipe(
      rxop.find(aMsg => aMsg.n === bMsg.n),
      rxop.map(aMsg => [aMsg, bMsg])
    ))
  )
);
const withConditionSub = zippedWithCondition$.subscribe(msg => {
  console.log("[ZIPPED WITH CONDITION]", msg);
});
a$.next({n: 0, type: "a"});
b$.next({n: 1, type: "b"});
a$.next({n: 1, type: "a"});
a$.next({n: 2, type: "a"});
b$.next({n: 2, type: "b"});
b$.next({n: 0, type: "b"});
a$.next({n: 3, type: "a"});
b$.next({n: 3, type: "b"});
withConditionSub.unsubscribe();

// Zipping a$ and b$ without a conditional
const normalZipped$ = zip(a$, b$);
const normalZippedSub = normalZipped$.subscribe(msg => {
  console.log("[NORMAL ZIP]", msg);
});
a$.next({n: 0, type: "a"}); // same order as above
b$.next({n: 1, type: "b"});
a$.next({n: 1, type: "a"});
a$.next({n: 2, type: "a"});
b$.next({n: 2, type: "b"});
b$.next({n: 0, type: "b"});
a$.next({n: 3, type: "a"});
b$.next({n: 3, type: "b"});
normalZippedSub.unsubscribe();

输出:

[ZIPPED WITH CONDITION] [ { n: 1, type: 'a' }, { n: 1, type: 'b' } ]
[ZIPPED WITH CONDITION] [ { n: 2, type: 'a' }, { n: 2, type: 'b' } ]
[ZIPPED WITH CONDITION] [ { n: 0, type: 'a' }, { n: 0, type: 'b' } ]
[ZIPPED WITH CONDITION] [ { n: 3, type: 'a' }, { n: 3, type: 'b' } ]

[NORMAL ZIP] [ { n: 0, type: 'a' }, { n: 1, type: 'b' } ]
[NORMAL ZIP] [ { n: 1, type: 'a' }, { n: 2, type: 'b' } ]
[NORMAL ZIP] [ { n: 2, type: 'a' }, { n: 0, type: 'b' } ]
[NORMAL ZIP] [ { n: 3, type: 'a' }, { n: 3, type: 'b' } ]

看看我[ZIPPED WITH CONDITION]n每一对的匹配如何。对于普通 zip,不会发生这种情况,因为消息是乱序的。

那么有没有更好的方法来做到这一点?也许以一种可扩展的方式压缩任意数量的可观察对象?

标签: functional-programmingstreamrxjs

解决方案


在您的解决方案中,您正在构建2n + 2m可观察对象。换句话说,如果a$发出m个值并b$发出n 个值,则您正在创建2n + 2m 个observables。

要看到这一点,您只需在测试数据序列中添加complete方法a$b$如下例所示

a$.next({n: 0, type: "a"});
b$.next({n: 1, type: "b"});
a$.next({n: 1, type: "a"});
a$.next({n: 2, type: "a"});
b$.next({n: 2, type: "b"});
b$.next({n: 0, type: "b"});
a$.next({n: 3, type: "a"});
b$.next({n: 3, type: "b"});
a$.complete();
b$.complete();
withConditionSub.unsubscribe();

我认为,为了找到一个好的解决方案,你应该考虑时间变量来完成你的要求。例如:"a$并且b$总是发射n属性依次增加 1 的对象 - 没有预见到顺序序列中的漏洞 - 依此类推。

如果不能指定时间相关的需求,很难想象一个 RxJS 的解决方案


推荐阅读