首页 > 解决方案 > RxSwift subscribeOn 和 observeOn 不在预期的后台线程上

问题描述

我有以下调度程序:

let scheduler = ConcurrentDispatchQueueScheduler(queue: .global(qos: .background))

尝试过: observeOn(scheduler) 也尝试过: subscribeOn(scheduler)

由于调度程序,我期望来自订阅的回调将在某些后台线程上执行。

func signIn(withExternalUserID userID: UserID, authenticationToken: String) -> Single<QBUUser> {
        let authenticationToken = "Bearer \(authenticationToken)"
        return .create { observable in
            let request = QBRequest.logIn(
                withUserLogin: "\(userID)",
                password: authenticationToken,
                successBlock: { _, user in
                    user.password = QBSession.current.sessionDetails?.token
                    observable(.success(user)) // <- Here we are on the main thread
                }, errorBlock: { [unowned self] in
                    observable(.error(ChatError.signInError(self.resolve(errorResponse: $0))))  // <- same here: main thread
                }
            )
            return Disposables.create {
                request.cancel()
            }
        }
    }

因为内部API的回调是在主线程中执行的,所以我所有的订阅而不是后台线程也都是在主线程中执行的。

tokenRequest
      .flatMap { [unowned self] token -> Single<QBUUser> in
           return self.chatService.signIn(withExternalUserID: currentUser.id, authenticationToken: token)
      }
      .flatMap { [unowned self] user -> Single<QBUUser> in
           return self.chatService.connect(user: user).andThen(Single.just(user))
      }
      .observeOn(self.scheduler)
      .subscribe(onNext: { [unowned self] user in
            self.chatStorage.set(currentQBUser: user, currentUser: currentUser)
            completable(.completed)
            self.loginSempahore.signal()
       }, onError: { [unowned self] error in
            self.chatStorage.unsetCurrentQBUser()
            completable(.error(error))
            self.loginSempahore.signal()
       })
       .disposed(by: self.disposeBag)

什么是从 flatMap 强制所有回调并订阅由后台线程执行的正确方法?

标签: rx-swift

解决方案


除非库为您提供了执行此操作的选项,否则无法强制函数在特定线程上调用其回调。你的successBlockerrorBlock闭包将在任何线程QBRequest.logIn想要调用它们时被调用,你对此无能为力。

也就是说,.observeOn(_:)操作员会将执行转移到不同的线程,因此在您的最后一个代码示例中,您的onNextonError闭包self.scheduler线程上执行。


推荐阅读