首页 > 解决方案 > RxSwift:在 onNext 之后调用 onCompleted 仅提供已完成的事件

问题描述

我将一些遗留的完成块代码包装在一个 Observable 中。它将发出一个事件(下一个或错误),然后完成。问题是调用onNext(), onCompleted()只将完成的事件发送给观察者。为什么没有next发送事件?

更新:人员流实际上按预期工作。问题出现在下一个流中,filteredPeople。内部完成的事件被传递给它,我只是返回它,这会终止流。

我需要completed从内部流中过滤掉事件。

let people = Observable<Event<[Person]>>()
    .flatMapLatest {
        return fetchPeople().asObservable().materialize()
    }
    .share()

// this is bound to a search field
let filterText = PublishSubject<String>()

let filteredPeople = Observable.combineLatest(people, filterText) { peopleEvent, filter in

    // this is the problem. the completed event from people is being returned, and it terminates the stream
    guard let people = peopleEvent.element else { return peopleEvent }

    if filterText.isEmpty { return .next(people) }

    return .next(people.filter { ... })
}

func fetchPeople() -> Single<[Person]> {
    return Single<[Person]>.create { observer in
        PeopleService.fetch { result in
            switch result {
            case .success(let people):
                observer(.success(people))
            case .failure(let error):
                observer(.error(error))
            }
        }

        return Disposables.create()
    }
}

filteredPeople.subscribe(
    onNext: { event in
        // ?! doesn't get called
    },
    onCompleted: {
        //  we get here, but why?
    },
    onError: {event in
        ...
    }).disposed(by: disposeBag)

标签: rx-swift

解决方案


您尚未发布导致问题的代码。下面的代码按预期工作:

struct Person { }

class PeopleService {
    static func fetch(_ result: @escaping (Result<[Person], Error>) -> Void) {
        result(.success([]))
    }
}

let disposeBag = DisposeBag()

func fetchPeople() -> Single<[Person]> {
    return Single<[Person]>.create { observer in
        PeopleService.fetch { result in
            switch result {
            case .success(let people):
                observer(.success(people))
            case .failure(let error):
                observer(.error(error))
            }
        }

        return Disposables.create()
    }
}

let people = Observable<Void>.just(())
    .flatMapLatest { _ in
        return fetchPeople().asObservable().materialize()
    }
    .share()

people.subscribe(
    onNext: { event in
        print("onNext does get called")
        print("in fact, it will get called twice, once with a .next(.next([Person])) event")
        print("and once with a .next(.completed) event.")
    },
    onCompleted: {
        print("this prints after onNext gets called")
    })
    .disposed(by: disposeBag)

推荐阅读