首页 > 解决方案 > RxSwift 跳过事件直到自己的序列完成

问题描述

我有一个 observable(我们称之为触发器)可以在短时间内发出多次。当它发出时,我正在执行网络请求,并且我正在使用扫描操作员存储结果。

我的问题是我想等到请求完成后再做一次。(但现在如果触发器发出 2 个 observables,那么 fetchData 是否完成并不重要,它会再次执行)

奖励:我也想每 X 秒只取第一个(Debounce 不是解决方案,因为它可以一直发射,我想每 X 秒获得 1 个,它也不是节流,因为如果一个 observable 发出 2时间真的很快我会得到第一个和第二个延迟 X 秒)

编码:

trigger.flatMap { [unowned self] _ in
        self.fetchData()
        }.scan([], accumulator: { lastValue, newValue in
        return lastValue + newValue
    })

并获取数据:

func fetchData() -> Observable<[ReusableCellVMContainer]>

扳机:

let trigger = Observable.of(input.viewIsLoaded, handle(input.isNearBottomEdge)).merge() 

标签: swiftrx-swiftreactivex

解决方案


对不起,我误解了你在下面的回答中试图完成的事情。

将实现您想要的操作员是flatMapFirst. 这将忽略触发器中的事件,直到fetchData()完成。

trigger
    .flatMapFirst { [unowned self] _ in
        self.fetchData()
    }
    .scan([], accumulator: { lastValue, newValue in
        return lastValue + newValue
    })

如果有帮助,我将在下面留下我之前的答案(如果有的话,它有“奖励”答案。)


您遇到的问题称为“背压”,即当可观察对象产生的值比观察者可以处理的速度更快时。

在这种特殊情况下,我建议您不要限制数据获取请求,而是将每个请求映射到一个键,然后按顺序发出数组:

trigger
    .enumerated()
    .flatMap { [unowned self] count, _ in
        Observable.combineLatest(Observable.just(count), self.fetchData())
    }
    .scan(into: [Int: Value](), accumulator: { lastValue, newValue in
        lastValue[newValue.0] = newValue.1
    })
    .map { $0.sorted(by: { $0.key < $1.key }).map { $0.value }}

为了使上述工作,你需要这个:

extension ObservableType {
    func enumerated() -> Observable<(Int, E)> {
        let shared = share()
        let counter = shared.scan(0, accumulator: { prev, _ in return prev + 1 })
        return Observable.zip(counter, shared)
    }
}

这样,您的网络请求将尽快开始,但您不会丢失它们的发出顺序。


对于您的“奖金”,buffer运营商将完全按照您的意愿行事。就像是:

trigger.buffer(timeSpan: seconds, count: Int.max, scheduler: MainScheduler.instance)
    .map { $0.first }

推荐阅读