websocket - 如何将一系列可观察对象合并为一个。抛出错误“可观察”类型上不存在属性“管道”[]'
问题描述
角度 v 6.1.10
打字稿 v 2.9.2
rxjs v 6.3.3
ng2-stmompjs v 7.0.0
我正在使用ng2-stomp 库来创建可观察的 Web 套接字,它将启动可观察的订阅。在我的要求中,我正在创建基于应用程序 ID 的多个频道订阅,现在想一次性订阅所有这些频道,或者我们可以说更高阶的 observable,因此尝试使用各种 rxjs 运算符,merge
但到目前为止没有任何效果。这是我到目前为止所做的。mergeAll
concat
现在这个正在工作
appList = [{appID: '123'}, {appID: '345'}];
const appList$ = appList.map((appID: string, idx: number) => {
const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
console.log({ watcher }); // This is observable
return watcher;
});
appList$.forEach((app$) => {
app$.subscribe((message: Message) => {
const notification: Notification = JSON.parse(message.body);
this.totalNotificationCount++;
if (Object.keys(notification).length) {
this.notificationMessages.push(notification);
}
});
});
{
"watcher": { "_isScalar": false, "source": { "source": { "_isScalar": false } }, "operator": { "connectable": { "source": { "_isScalar": false } } } }
}
但我认为我们可以将所有 observables 合并为一个,并且可以全部订阅。请注意,我无法使用ForkJoin
,因为 appList 是动态的,因此 WebSocket 的数量也是如此。以下是我将多个 observable 转换为一次的路径。
试验一:使用concat
andmap
运算符
const batch = appList.map((appID, idx) => {
console.log({ appID, idx });
const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
return watcher;
});
concat(...batch).pipe( map (i => i)).subscribe({ });
这给出了错误:
类型“MonoTypeOperatorFunction”上不存在属性“管道”。
试用 2:使用 subscribe all afterconcat
concat(...batch).subscribe({
next: (v: any) => console.log(v),
complete: () => console.log('Complete')
});
错误:“MonoTypeOperatorFunction”类型上不存在属性“订阅”。
路径 3:使用pipe
const appList$ = appList.map((appID: string, idx: number) => {
const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
return watcher;
});
console.log({ appList$ });
appList$.pipe(
takeUntil(this.ngUnsubscribe),
tap((i) => {
console.log('tapping', i);
})
);
{
"appList$": [
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
}
]
}
错误:“Observable []”类型上不存在属性“管道”
所以我的问题是如何将所有可观察的合并到一次并订阅一次
解决方案
这真太了不起了; 每当我在这里写问题并重试时,我自己就找到了解决方案。
我已经用这种方式解决了from
,mergeMap
并且感谢这篇有深度的文章
private watchApplications(appList: string[]) {
const appList$ = from(appList).pipe(
mergeMap((appID, idx) => {
const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
return watcher;
})
);
appList$
.pipe(
takeUntil(this.ngUnsubscribe),
tap((f: Frame) => {
console.log('tapping Frame', f);
})
)
.subscribe((message: Message) => {
const notification: Notification = JSON.parse(message.body);
console.log({ notification });
this.totalNotificationCount++;
if (Object.keys(notification).length) {
this.notificationMessages.push(notification);
}
});
}
推荐阅读
- reactjs - useEffect 清理功能不适用于 history.push()
- android - registerForActivityResult 可以在协程中使用吗?
- r - 如何为合并设置应该被视为相等的值
- java - 正确覆盖已在 Parent 方法中实现的 Grandparent 方法
- azure - Azure DevOps 中的原始估计值 - 最大值?
- javascript - 如果未选中任何按钮,如何使单选按钮选项成为必需并弹出错误横幅?
- angular - 与firebase一起使用时异步不起作用
- reactjs - React Native Reanimated 函数渲染如何只拍摄一次
- python - 如何使用 pydriller 从特定分支获取提交?
- pandas - 如何将不可变 MultiDict 转换为数据框