首页 > 解决方案 > Swift Combine 替代 Rx Observable.create

问题描述

我有一些使用 RxSwift 构建的代码,我正在尝试将其转换为使用 Apple 的 Combine 框架。

一种非常常见的模式是使用Observable.create一次性可观察对象(通常是网络请求)。像这样的东西:

func loadWidgets() -> Observable<[Widget]> {
  return Observable.create { observer in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      observer.onNext(widgets)
      observer.onComplete()
    }, error: { error in
      // publish error on failure
      observer.onError()
    })
    // allow cancellation
    return Disposable {
      loadTask.cancel()
    }
  }
}

我正在尝试将其映射到Combine,但我无法弄清楚。我能得到的最接近的是使用 Future 来做这样的事情:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
  return Future<[Widget], Error> { resolve in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      resolve(.success(widgets))
    }, error: { error in
      // publish error on failure
      resolve(.failure(error))
    })
    // allow cancellation ???
  }
}

如您所见,它完成了大部分工作,但无法取消。其次,未来不允许多个结果。

有没有办法做类似 RxObservable.create模式的事情,它允许取消和可选的多个结果?

标签: swiftcombine

解决方案


我想我找到了一种模仿Observable.create使用PassthroughSubjectin的方法Combine。这是我制作的助手:

struct AnyObserver<Output, Failure: Error> {
    let onNext: ((Output) -> Void)
    let onError: ((Failure) -> Void)
    let onComplete: (() -> Void)
}

struct Disposable {
    let dispose: () -> Void
}

extension AnyPublisher {
    static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
        let subject = PassthroughSubject<Output, Failure>()
        var disposable: Disposable?
        return subject
            .handleEvents(receiveSubscription: { subscription in
                disposable = subscribe(AnyObserver(
                    onNext: { output in subject.send(output) },
                    onError: { failure in subject.send(completion: .failure(failure)) },
                    onComplete: { subject.send(completion: .finished) }
                ))
            }, receiveCancel: { disposable?.dispose() })
            .eraseToAnyPublisher()
    }
}

这是它在使用中的样子:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
    AnyPublisher.create { observer in
        let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
          observer.onNext(widgets)
          observer.onComplete()
        }, error: { error in
          observer.onError(error)
        })
        return Disposable {
          loadTask.cancel()
        }
    }
}

推荐阅读