javascript - 使用 mergeMap 继续在 RxJs 管道中出错
问题描述
我正在使用 RxJs 管道和 mergeMap 运算符进行一些并行 HTTP 获取。
在第一个请求失败时(假设 /urlnotexists 抛出 404 错误)它会停止所有其他请求。
我希望它继续查询所有剩余的 url,而不为这个失败的请求调用所有剩余的 mergeMap。
我尝试使用 RxJs 中的 throwError 和 catchError,但没有成功。
index.js
const { from } = require('rxjs');
const { mergeMap, scan } = require('rxjs/operators');
const request = {
get: url => {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (url === '/urlnotexists') { return reject(new Error(url)); }
return resolve(url);
}, 1000);
});
}
};
(async function() {
await from([
'/urlexists',
'/urlnotexists',
'/urlexists2',
'/urlexists3',
])
.pipe(
mergeMap(async url => {
try {
console.log('mergeMap 1:', url);
const val = await request.get(url);
return val;
} catch(err) {
console.log('err:', err.message);
// a throw here prevent all remaining request.get() to be tried
}
}),
mergeMap(async val => {
// should not pass here if previous request.get() failed
console.log('mergeMap 2:', val);
return val;
}),
scan((acc, val) => {
// should not pass here if previous request.get() failed
acc.push(val);
return acc;
}, []),
)
.toPromise()
.then(merged => {
// should have merged /urlexists, /urlexists2 and /urlexists3
// even if /urlnotexists failed
console.log('merged:', merged);
})
.catch(err => {
console.log('catched err:', err);
});
})();
$ node index.js
mergeMap 1: /urlexists
mergeMap 1: /urlnotexists
mergeMap 1: /urlexists2
mergeMap 1: /urlexists3
err: /urlnotexists
mergeMap 2: /urlexists
mergeMap 2: undefined <- I didn't wanted this mergeMap to have been called
mergeMap 2: /urlexists2
mergeMap 2: /urlexists3
merged: [ '/urlexists', undefined, '/urlexists2', '/urlexists3' ]
我希望在最后一个对象中发出并发 GET 请求并减少它们各自的值。
但是如果发生一些错误,我希望他们不要中断我的管道,而是记录它们。
有什么建议吗?
解决方案
如果你想使用 RxJS,你应该在catchError
使用forkJoin
.
const { of, from, forkJoin } = rxjs;
const { catchError, tap } = rxjs.operators;
// your promise factory, unchanged (just shorter)
const request = {
get: url => {
return new Promise((resolve, reject) => setTimeout(
() => url === '/urlnotexists' ? reject(new Error(url)) : resolve(url), 1000
));
}
};
// a single rxjs request with error handling
const fetch$ = url => {
console.log('before:', url);
return from(request.get(url)).pipe(
// add any additional operator that should be executed for each request here
tap(val => console.log('after:', val)),
catchError(error => {
console.log('err:', error.message);
return of(undefined);
})
);
};
// concurrently executed rxjs requests
forkJoin(["/urlexists", "/urlnotexists", "/urlexists2", "/urlexists3"].map(fetch$))
.subscribe(merged => console.log("merged:", merged));
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>
推荐阅读
- python - 执行不带语句的 Sql 查询
- c++ - 如何从对其中一个项目的引用中获取 std::vector 元素的索引?
- firebase - SwiftUI Firebase 电话身份验证更改显示名称
- makefile - 如何改变这个makefile的结构
- static - 如何设置 HTTPS:Google Cloud Storage 上的站点
- python - 如何旋转 xtick 标签条形图 plotly express?
- javascript - random number generator not displaying numbers properly when asigning specific min and max
- c# - 将日期格式从 dd/MM/yyyy 转换为 MM/dd/yyyy
- react-native - 反应本机菜单,重复相同的项目
- r - 使用 dplyr 如何根据指标在数据框中为负值