首页 > 解决方案 > RxSwift。依次执行单独的 Observables

问题描述

我试图让我的 Observables 仅在之前的 Observable 完成时才执行。我不能使用flatMap,因为订阅可以从不同的地方调用,而这个Observables是没有相互连接的。具体来说:我让我的 CollectionView 从服务器加载更多内容,并且在该用户单击“发送评论”按钮后 2 秒,而 CollectionView 仍在加载其批处理。所以我想等到 CollectionView 更新完成,然后才执行我的评论发布请求。我创建了一个名为 ObservableQueue 的类,它工作得很好。但我需要知道它是否存在内存泄漏、死锁等问题,或者我只是遗漏了一些东西。这里是:

extension CompositeDisposable {

    @discardableResult
    func insert(disposeAction: @escaping () -> ()) -> DisposeKey? {
        return insert(Disposables.create(with: disposeAction))
    }

}

class ObservableQueue {

    private let lock = NSRecursiveLock()
    private let relay = BehaviorRelay(value: 0)
    private let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "ObservableQueue.scheduler")

    func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
        return Observable.create({ observer -> Disposable in
            let disposable = CompositeDisposable()

            let relayDisposable = self
                .relay
                .observeOn(self.scheduler)
                .filter({ value -> Bool in
                    if value > 0 {
                        return false
                    }

                    self.lock.lock(); defer { self.lock.unlock() }

                    if self.relay.value > 0 {
                        return false
                    }

                    self.relay.accept(self.relay.value + 1)

                    disposable.insert {
                        self.lock.lock(); defer { self.lock.unlock() }
                        self.relay.accept(self.relay.value - 1)
                    }

                    return true
                })
                .take(1)
                .flatMapLatest { _ in observable }
                .subscribe { observer.on($0) }

            _ = disposable.insert(relayDisposable)

            return disposable
        })
    }

}

然后我可以像这样使用它:

let queue = ObservableQueue()

...

// first observable
let observable1 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable1)
    .subscribe(onNext: { _ in
        print("here1")
     })
    .disposed(by: rx.disposeBag)

// second observable
let observable2 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable2)
    .subscribe(onNext: { _ in
        print("here2")
    })
    .disposed(by: rx.disposeBag)

// third observable
let observable3 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable3)
    .subscribe(onNext: { _ in
        print("here3")
    })
    .disposed(by: rx.disposeBag)

标签: iosswiftsynchronizationrx-swiftreactive

解决方案


CLGeocoder 有同样的问题。根据文档,您不能在处理先前请求时调用其中一个地理编码器方法,这与您正在尝试执行的操作非常相似。在这个要点(https://gist.github.com/danilt1263/64bda2a32c18b8c28e1e22085a05df5a)中,您会发现我在后台线程上进行了可观察的调用并使用信号量保护了工作。那是关键,你需要一个信号量,而不是一个锁。

像这样的东西应该适合你:

class ObservableQueue {

    private let semaphore = DispatchSemaphore(value: 1)
    private let scheduler = ConcurrentDispatchQueueScheduler(qos: .userInitiated)

    func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
        let _semaphore = semaphore // To avoid the use of self in the block below
        return Observable.create { observer in
            _semaphore.wait()
            let disposable = observable.subscribe { event in
                switch event {
                case .next:
                    observer.on(event)
                case .error, .completed:
                    observer.on(event)
                }
            }
            return Disposables.create {
                disposable.dispose()
                _semaphore.signal()
            }
        }
        .subscribeOn(scheduler)
    }
}

推荐阅读