rxjs - 仅当 observable 未完成时取消订阅
问题描述
我有一个 observable,我想在订阅者取消订阅时调用取消(拆卸)逻辑,但前提是源 observable 本身尚未完成(或失败)。
内置finalize
运算符允许在unsubscribe
发生时注册自定义回调,但只要取消订阅是由订阅者或源 observable 的完成引起的,它就会被调用。
我实现了这个辅助函数:
function withCancellation(source, onCancel) {
return new Observable(subscriber => {
let completed = false;
const cancellable = source.pipe(
tap({
error: () => { completed = true; },
complete: () => { completed = true; },
})
);
const subscription = cancellable.subscribe(subscriber);
subscription.add(() => { if (!completed) onCancel(); });
return subscription;
});
}
我可以使用以下方式:
const sourceStream = startJob(jobId); // returns source observable
const cancellableStream = withCancellation(sourceStream, () => stopJob(jobId));
有没有更简洁的方法可以使用任何内置原语来实现相同的目标?
解决方案
推荐阅读
- c++ - 上限 time_point 到运行时定义的持续时间
- python - 使用 xml.dom.minidom 在 Python 中进行 XML 注入
- php - 过滤整个 $_POST 变量 - 这会安全吗?
- sql - 如何修复 OLEDB、SQL 中的更新和设置命令?
- amazon-web-services - 没有得到詹金斯解锁路径
- java - 如何解决:数独求解器堆栈溢出问题
- java - 如何从文本中提取单个单词和 url?
- vue.js - Vuetify v-flex offset-*(1-12) 无法正常工作
- haskell - 如何遍历树结构并更改其数据类型
- javascript - ReactJS - net::ERR_FILE_NOT_FOUND