首页 > 解决方案 > 如何等待 Subject.next

问题描述

假设我有这样的订阅:

mySubject$.subscribe(async inp => await complexFunction(inp))

假设这mySubject$是我拥有的主题。我需要一种方法来做到这一点:

await mySubject$.next(inp)

这样我就可以等待complexFunction完成。

我试图为此创建一个自定义的主题实现,但这似乎并不容易。也许我错过了一些想法,它实际上很简单。

我在这里有什么选择?

更新:

基于 rxjs API 应该接受 Promises 的想法,我尝试了下面的代码。它不起作用,所以再次,我没有想法。

function rigObservable<T>(observable: Observable<T>) {
    let resolve;
    return {
        awaiter: new Promise<void>(res => resolve = res),
        observable: observable.pipe(tap(() => resolve()))
    };
}

fdescribe('test', () => {
    it('should work', async () => {
        const subject = new Subject<string>();
        const wrapper = rigObservable(subject);
        let testValue;
        wrapper.observable.subscribe(inp => of(inp).pipe(delay(1000), tap(t => testValue = t)).toPromise());
        subject.next('text');
        await wrapper.awaiter;
        expect(testValue).toEqual('text');
    });
});

标签: javascriptrxjs

解决方案


我不确定你的意图,但也许这样的事情会起作用:

const source$ = mySubject$.pipe(
    mergeMap(inp => complexFunction(inp))
);

source$.subscribe(
    value => console.log('received value from complexFunction', value)
);

source$是一个可观察的,将发出complexFunction()每次发出的结果mySubject$


推荐阅读