首页 > 解决方案 > 如何连接嵌套在另一个 Observable 中的多个 Observable 集合

问题描述

我是 RxJs 的新手,我正在为一个 Angular 应用程序的复杂场景而苦苦挣扎,其中我有嵌套在其他可观察对象中的可观察对象(以下简化)。

简而言之,我有:

我尝试将所有活动中的所有结果提取到一个 Observable 中。所以,从下面的例子来看(这不是代码,而是数据结构的一个想法):

Observable<SearchActivity[]> => [
 {id: 1, results$: Observable<SearchActivityResult[]> => [1, 2]},
 {id: 2, results$: Observable<SearchActivityResult[]> => [3, 4, 5]} 
 {id: 3, results$: Observable<SearchActivityResult[]> => [6]} 
]

我想提取结果以获得:

Observable<SearchActivityResult[]> => [1, 2, 3, 4, 5, 6] (Edit: the order is not important)

使用 RxJs 如何实现这一点?

到目前为止,我得到了类似的东西:

// This is not working properly
this.allResults$ = this.displayedActivities$.pipe(
  mergeMap(activities => {
    return concat(...activities?.map(activity => activity.results$ || of([])));
  })
);

但它似乎没有按预期工作。

我在这里准备了一个具有更复杂场景的 StackBiltz

2021-02-23 更新

我创建了一个新的StackBlitz,您可以通过单击它与结果进行交互。目标是获取每个数据库(页面底部的表格)的所有结果的摘要,并显示所有搜索条件下正/负结果的数量。

TL;DR => 解决方案

我刚刚创建了一个复杂度较低的StackBlitz ,以隔离问题。我已经包含了提出的建议Mrk Sef并且它正在工作。我对平面运算符有一些其他错误,并且对 json 有一个奇怪的循环引用,但这更有可能是由 StackBlitz 模拟器引起的。

标签: angularmergerxjsobservableconcatenation

解决方案


我认为您的想法是正确的,但是您需要最后一步来将您的流还原为您的价值。

你写的:

this.allResults$ = this.displayedActivities$.pipe(
  mergeMap(activities => 
    concat(...activities?.map(
      activity => activity?.results$ || of([])
    ))
  )
);

中的每个条目都有一个排放activities,但您不想要一系列排放,您想要一个合并/展平结果的数组。

您可以尝试以下方法:

this.allResults$ = this.displayedActivities$.pipe(
  mergeMap(activities => 
    merge(...activities?.map(
      activity => activity?.results$ || of([])
    ))
  ),
  toArray(),
  map(res => res.flat())
);

或者

this.allResults$ = this.displayedActivities$.pipe(
  mergeMap(activities => 
    merge(...activities?.map(
      activity => activity?.results$ || of([])
    ))
  ),
  reduce((acc, curr) => [...acc,...curr], [])
);

更新:

你可以运行这个最小的例子:

const displayedActivities$ = of([
  {results$: of([1,2,3])},
  {results$: of([10,20,30])},
  {something: "else"},
  {results$: of([100,200,300])}
]);

const allResults$ = displayedActivities$.pipe(
  mergeMap(activities => 
    merge(...activities.filter(
      activity => activity?.results$ != null
    ).map(
      activity => activity.results$
    ))
  ),
  reduce((acc, curr) => [...acc,...curr], [])
).subscribe(console.log);

输出:

[1, 2, 3, 10, 20, 30, 100, 200, 300]

更新#2

这就是你可以如何处理将属性 result$ observables 更改为 long-lived observables 的方式。它与上面的 , 非常相似toArray()Array.flat()只是现在它使用combineLatest

这是应该运行的代码。它将保留每个可观察到的最新发射作为最终组合输出的一部分:

模拟设置

创建displayedActivities$一个发出对象数组的对象,每个对象都有一个result$在完成之前发出一定次数(在本例中为 3 次)的属性。

/****
 * Pipeable Operator:
 * Takes arrays emitted by the source and spaces out their
 * values by the given interval time in milliseconds
 ****/
function intervalArray<T>(intervalTime = 1000): OperatorFunction<T[], T> {
  return pipe(
    concatMap((v: T[]) =>
      concat(
        ...v.map((value: T) =>
          EMPTY.pipe(
            delay(intervalTime),
            startWith(value)
          )
        )
      )
    )
  );
}

const displayedActivities$ = of([
  { results$: 
    of([
      [1,2,3],
      [4,5,6],
      [7,8,9]
    ]).pipe(intervalArray(250))
  },
  { results$: 
    of([
      [10,20,30],
      [40,50,60],
      [70,80,90]
    ]).pipe(intervalArray(300))
  },
  { something: "else" },
  { results$: 
    of([
      [100,200,300],
      [400,500,600],
      [700,800,900]
    ]).pipe(intervalArray(350))
  }
]);

合并所有结果$

const allResults$ = displayedActivities$.pipe(
  mergeMap(activities => 
    combineLatest(...activities.filter(
      activity => activity?.results$ != null
    ).map(
      activity => activity.results$.pipe(startWith([]))
    ))
  ),
  map(latest => latest.flat())
).subscribe(console.log);

输出:

[1,2,3]
[1,2,3,10,20,30]
[1,2,3,10,20,30,100,200,300]
[4,5,6,10,20,30,100,200,300]
[4,5,6,40,50,60,100,200,300]
[4,5,6,40,50,60,400,500,600]
[7,8,9,40,50,60,400,500,600]
[7,8,9,70,80,90,400,500,600]
[7,8,9,70,80,90,700,800,900]

推荐阅读