首页 > 解决方案 > 如何将一系列可观察对象合并为一个。抛出错误“可观察”类型上不存在属性“管道”[]'

问题描述

角度 v 6.1.10
打字稿 v 2.9.2
rxjs v 6.3.3
ng2-stmompjs v 7.0.0

我正在使用ng2-stomp 库来创建可观察的 Web 套接字,它将启动可观察的订阅。在我的要求中,我正在创建基于应用程序 ID 的多个频道订阅,现在想一次性订阅所有这些频道,或者我们可以说更高阶的 observable,因此尝试使用各种 rxjs 运算符,merge但到目前为止没有任何效果。这是我到目前为止所做的。mergeAllconcat

现在这个正在工作

appList = [{appID: '123'}, {appID: '345'}];


 const appList$ = appList.map((appID: string, idx: number) => {
            const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
            const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
            console.log({ watcher }); // This is observable
            return watcher;
        });



appList$.forEach((app$) => {
            app$.subscribe((message: Message) => {
                const notification: Notification = JSON.parse(message.body);
                this.totalNotificationCount++;
                if (Object.keys(notification).length) {
                    this.notificationMessages.push(notification);
                }
            });
        });

{
  "watcher": { "_isScalar": false, "source": { "source": { "_isScalar": false } }, "operator": { "connectable": { "source": { "_isScalar": false } } } }
}

但我认为我们可以将所有 observables 合并为一个,并且可以全部订阅。请注意,我无法使用ForkJoin,因为 appList 是动态的,因此 WebSocket 的数量也是如此。以下是我将多个 observable 转换为一次的路径。

试验一:使用concatandmap运算符

const batch = appList.map((appID, idx) => {
            console.log({ appID, idx });
            const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
            const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
            return watcher;
        });



concat(...batch).pipe( map (i => i)).subscribe({ });

这给出了错误:

类型“MonoTypeOperatorFunction”上不存在属性“管道”。

试用 2:使用 subscribe all afterconcat

 concat(...batch).subscribe({
            next: (v: any) => console.log(v),
            complete: () => console.log('Complete')
        });

错误:“MonoTypeOperatorFunction”类型上不存在属性“订阅”。

路径 3:使用pipe

const appList$ = appList.map((appID: string, idx: number) => {
            const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
            const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
            return watcher;
        });

        console.log({ appList$ });
        appList$.pipe(
            takeUntil(this.ngUnsubscribe),
            tap((i) => {
                console.log('tapping', i);
            })
        );

console.log({appList$}) 返回这个可观察组

  {
  "appList$": [
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    }
  ]
}

错误:“Observable []”类型上不存在属性“管道”

所以我的问题是如何将所有可观察的合并到一次并订阅一次

标签: websocketrxjsobservable

解决方案


这真太了不起了; 每当我在这里写问题并重试时,我自己就找到了解决方案。

我已经用这种方式解决了frommergeMap并且感谢这篇有深度的文章

private watchApplications(appList: string[]) {
 const appList$ = from(appList).pipe(
            mergeMap((appID, idx) => {
                const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
                const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
                return watcher;
            })
        );
        appList$
            .pipe(
                takeUntil(this.ngUnsubscribe),
                tap((f: Frame) => {
                    console.log('tapping Frame', f);
                })
            )
            .subscribe((message: Message) => {
                const notification: Notification = JSON.parse(message.body);
                console.log({ notification });
                this.totalNotificationCount++;
                if (Object.keys(notification).length) {
                    this.notificationMessages.push(notification);
                }
            });

}

推荐阅读