首页 > 解决方案 > Rxjs - 在订阅者取消订阅后清理 observable

问题描述

我正在使用一个不支持 rxjs 的库,只是很好的回调。我试图将它包装在一个可观察的中,但是当有人取消订阅时,我需要内部流自行处理。

subscribeToQuotes() {
    return new Observable(observer => {

        const stream = someLibrary.getStream();

        stream.onNewData(data => {
            observer.next(data);
        });
        stream.onComplete(() => observer.complete());

        const request = stream.begin(); 

        //I need to call request.abort() when subscriber unsubscribes
    })
}

我想这样使用它:

    const onInstrumentChanged = new Subject();

    onInstrumentChanged.subscribe(instrument => {

        this.subscribeToQuotes(instrument)
            .pipe(takeUntil(onInstrumentChanged))
            .subscribe(quotes => {
                ...
            })


    })

subscribeToQuotes()可以被多次调用,并且takeUntil()丢弃之前的订阅,但是内部流一直在触发。我需要在request.abort()内部打电话。

request.abort()当主题被onInstrumentChanged触发时我怎么能打电话?

这可以在不将主题传递给函数的情况下完成吗?

标签: javascriptrxjs

解决方案


好的,找到答案了。 observer.add()允许您在订阅者取消订阅时添加拆卸功能。所以在我的情况下,这有效:

    return new Observable(observer => {

        const stream = someLibrary.getStream();        
        const request = stream.begin(); 
        observer.add(() => request.abort());
    })


推荐阅读