javascript - Rxjs - 在订阅者取消订阅后清理 observable
问题描述
我正在使用一个不支持 rxjs 的库,只是很好的回调。我试图将它包装在一个可观察的中,但是当有人取消订阅时,我需要内部流自行处理。
subscribeToQuotes() {
return new Observable(observer => {
const stream = someLibrary.getStream();
stream.onNewData(data => {
observer.next(data);
});
stream.onComplete(() => observer.complete());
const request = stream.begin();
//I need to call request.abort() when subscriber unsubscribes
})
}
我想这样使用它:
const onInstrumentChanged = new Subject();
onInstrumentChanged.subscribe(instrument => {
this.subscribeToQuotes(instrument)
.pipe(takeUntil(onInstrumentChanged))
.subscribe(quotes => {
...
})
})
subscribeToQuotes()
可以被多次调用,并且takeUntil()
丢弃之前的订阅,但是内部流一直在触发。我需要在request.abort()
内部打电话。
request.abort()
当主题被onInstrumentChanged
触发时我怎么能打电话?
这可以在不将主题传递给函数的情况下完成吗?
解决方案
好的,找到答案了。 observer.add()
允许您在订阅者取消订阅时添加拆卸功能。所以在我的情况下,这有效:
return new Observable(observer => {
const stream = someLibrary.getStream();
const request = stream.begin();
observer.add(() => request.abort());
})
推荐阅读
- java - spring-security:CglibAopProxy 不拦截 GlobalMethodSecurityConfiguration 的方法调用
- wsh - 替代自定义协议(URI 方案)
- codenameone - 代号一原生界面中的iOS动态库.framework
- java - 如何从消息中获取表情符号?
- python-2.7 - Python字典没有用我正在迭代的列表的所有值更新键
- php - 想要将 webmail 登录转换为 SQL 文件
- windows-10 - Active Reports 6 PdfExport.Export() 方法在 Windows 10 创意者更新后引发 ArgumentOutOfRangeException
- c# - 在 .net Core、C# 中并行调用 API 的最佳方法是什么?
- php - 将查询的值与 php 中的 for 循环的值进行比较
- winforms - 如何更改datagridview中单个单元格的高度?