rxjs - 是否可以在每个可观察到的完成上发出值,这些完成将使用单管道发送给订阅者?
问题描述
我需要按声明的顺序调用多个 Web 请求。每个连续请求都取决于前一个请求的结果。我想在每个请求完成后发出将发送给订阅者的事件。
现在我正在“挖掘”每个请求的值并使用单独Subject
的 . 是否可以使用带有操作员的单管来做到这一点?
这是代码示例
fromEvent(pauseButton, "click")
.pipe(
tap(()=>{
subscribedLabel.innerHTML="";
tapedLabel.innerHTML="";
}),
tap(v => (tapedLabel.innerHTML = "started")),
concatMapTo(of("phase 1 completed").pipe(delay(1000))),
tap(v => (tapedLabel.innerHTML = v)),
concatMapTo(of("phase 2 completed").pipe(delay(1000))),
tap(v => (tapedLabel.innerHTML = v))
)
.subscribe(v => {
console.log(v);
subscribedLabel.innerHTML = v;
});
https://stackblitz.com/edit/typescript-6jza7h?file=index.ts
预期的结果是subscribedLabel.innerHTML
会以同样的方式改变tapedLabel.innerHTML
解决方案
目前尚不清楚您在追求什么,但这是一种您可以使用 4 个连续调用并将所有响应累积到一个对象中的方法。
function fakeHTTP(resW): Observable<string> {
return of(resW).pipe(delay(1000))
}
fromEvent(button, "click").pipe(
concatMap(_ =>
fakeHTTP(1).pipe(
map(res => ({first: res}))
)
),
tap(_ => console.log("First Request Complete")),
concatMap(first =>
fakeHTTP(2).pipe(
map(res => ({...first, second: res}))
)
),
tap(_ => console.log("Second Request Complete")),
concatMap(second =>
fakeHTTP(3).pipe(
map(res => ({...second, third: res}))
)
),
tap(_ => console.log("Third Request Complete")),
concatMap(third =>
fakeHTTP(4).pipe(
map(res => ({...third, fourth: res}))
)
),
tap(_ => console.log("Fourth Request Complete"))
).subscribe(console.log);
其输出如下:
// Wait 1s
First Request Complete
// Wait 1s
Second Request Complete
// Wait 1s
Third Request Complete
// Wait 1s
Fourth Request Complete
{"first":1,"second":2,"third":3,"fourth":4} // <- Value sent to subscribe
更新 #1:将值向上传递到调用链
您可以将值向上传递调用链,但它会变得有点复杂。您希望每个步骤仅处理上一步中的值,但忽略(发出未更改的)来自更上链的值。
您可以这样做的一种方法是标记每个响应。我用一个pass
可以是真或假的标志来做到这一点。最后的操作是删除标志。
看起来是这样的:
function fakeHTTP(resW): Observable<string> {
return of(resW).pipe(delay(1000))
}
fromEvent(button, "click").pipe(
concatMap(_ =>
fakeHTTP(1)
),
tap(_ => console.log("First Request Complete")),
concatMap(first =>
fakeHTTP(2).pipe(
map(res => ({pass: false, payload: res})),
startWith({pass: true, payload: first})
)
),
tap(({pass}) => {
if(!pass) console.log("Second Request Complete")
}),
concatMap(second => second.pass ?
of(second) :
fakeHTTP(3).pipe(
map(res => ({pass: false, payload: res})),
startWith({...second, pass: true})
)
),
tap(({pass}) => {
if(!pass) console.log("Third Request Complete")
}),
concatMap(third => third.pass ?
of(third) :
fakeHTTP(4).pipe(
map(res => ({pass: false, payload: res})),
startWith({...third, pass: true})
)
),
tap(({pass}) => {
if(!pass) console.log("Second Request Complete")
}),
map(({payload}) => payload)
).subscribe(console.log);
其输出如下:
// Wait 1s
First Request Complete // <- console log from tap
1 // <- console log from subscribe
// Wait 1s
Second Request Complete // <- console log from tap
2 // <- console log from subscribe
// Wait 1s
Third Request Complete // <- console log from tap
3 // <- console log from subscribe
// Wait 1s
Second Request Complete // <- console log from tap
4 // <- console log from subscribe
更新#2:当递归是可能的
您还可以进行递归调用,其中每个新调用都依赖于前一个调用,并且某些基本情况会结束递归。RxJS jasexpand
作为一种内置的递归方式。
在此示例中,每个新调用都直接fakeHTTP
使用前一次调用发出的值。
function fakeHTTP(resW): Observable<string> {
return of(resW).pipe(delay(1000))
}
fromEvent(button, "click").pipe(
map(_ => 1),
expand(proj => proj < 4 ?
fakeHTTP(++proj) :
EMPTY
)
).subscribe(console.log);
其输出如下:
// Wait 1s
1
// Wait 1s
2
// Wait 1s
3
// Wait 1s
4
更新 #3:单独的 observables
function fakeHTTP(resW): Observable<string> {
return of(resW).pipe(delay(1000))
}
const first$ = fromEvent(button, "click").pipe(
concatMap(_ => fakeHTTP(1)),
share()
);
const second$ = first$.pipe(
concatMap(first => fakeHTTP(2)),
share()
);
const third$ = second$.pipe(
concatMap(second => fakeHTTP(3)),
share()
);
const fourth$ = third$.pipe(
concatMap(third => fakeHTTP(4))
);
merge(
first$,
second$,
third$,
fourth$
).subscribe(console.log);
这是另一种更烦人的方式来编写几乎完全相同的内容。
function fakeHTTP(resW): Observable<string> {
return of(resW).pipe(delay(1000))
}
fromEvent(button, "click").pipe(
map(_ => fakeHTTP(1).pipe(
share(),
)),
map(first$ => ([first$.pipe(
concatMap(firstR => fakeHTTP(2)),
share()
), first$])),
map(([second$, ...tail]) => ([second$.pipe(
concatMap(secondR => fakeHTTP(3)),
share()
),second$, ...tail])),
map(([third$, ...tail]) => ([third$.pipe(
concatMap(thirdR => fakeHTTP(4))
),third$, ...tail])),
concatMap(calls => merge(...calls))
).subscribe(console.log);
推荐阅读
- java - 查询 Firebird 数据库导致 Java Heap Space 错误
- circleci - Circle CI CLI 在结帐步骤挂起
- java - 将字符串解析为文档
- google-bigquery - 如何覆盖postgresql的数据请求?
- android - 在后台下载项目使用什么
- html - 彩色 DIV 元素末尾的形状
- ios - iOS Swift Parse-强制退出应用程序后调用解析云函数
- css - 在 Wordpress 投资组合列表中悬停时更改图像
- java - 如果我只调用 get、put、remove 并以线程 id 为键并且从不迭代映射,我是否需要 ConcurrentHashMap?
- ios - Visual Studio for Mac 是否支持使用分发配置文件调试 iOS 应用程序?