首页 > 解决方案 > fromIterable 之后如何保证订阅的顺序?

问题描述

这是一个简化的示例,但基本上可以归结为

Observable.just(listOfItems)
    .flatMapIterable { it }
    .flatMap {
        doWhatever()
            .subscribeOn(Schedulers.io())
    }

我观察到并猜想这是有道理的,虽然 iterable 是按顺序 onNext 的,但它doWhatever()是按随机顺序订阅的。

有没有办法可以保证 doWhatever 订阅的顺序?我不想要 concatMap,而 concatMapEager 似乎只是订购结果,而不是订阅。基于索引的人为延迟?(似乎只有更高的延迟才可靠,无论如何都是神奇的数字 - 不好)

///

private val uploadSchedules = Schedulers.from(Executors.newFixedThreadPool(5))

fun upload(request: Request) : Completable {
      Observable.just(request)
         .flatMap { request -> upload(request).subscribeOn(uploadScheduler) }
}

fun upload(requests: List<Request>) : Completable {
      Observable.fromIterable(requests)
         // here the subscriptions order are race-y, I want to want them ordered by the order of the list--|
         .flatMap { request -> upload(request)                                                             |
                        .subscribeOn(uploadScheduler)                                                      |
                        .doOnSubscribe { log(request) }  <-------------------------------------------------|
         }
}

// log outputs:
// request 0
// request 3
// request 4
// request 2
// request 1
// etc, race-y, random order

标签: rx-javarx-java2

解决方案


推荐阅读