首页 > 解决方案 > 如何对远程服务一一请求调用 usnig rx 和 kotlin?

问题描述

我有应用程序必须向其发送数据的远程服务:retrofit2 中的定义:

interface FooRemoteService {
    @POST("/foos")
    fun postFoos(@Body foos: List<FooPojo>): Observable<Response<List<String>>
 }

但调用的限制一次不超过 X Foos。每个调用都可以返回 206 代码“部分成功”以及不成功的上传 foo 列表。还有 413“请求实体太大”。当然还有 400 和 500。

并且应用程序需要发送未知数量的 foo 项目(由用户在运行时定义)。

为了避免服务应用程序的 DDoS,需要一一发送此调用。

所以我在我的 FooRepositoryImpl 中做了这样的实现:

这是一个想法。我对以下解决方案不满意,我相信它可以做得更好,但我已经没有想法了。那么有什么建议吗?

override fun postFoos(foos: List<Foo>) Completable {
  val fooChunks = divideListInToChuncksUnderRequestLimit(foos)

  val unuploadedFoos = mutableListOf<UnuploadedFoo>()
  fooChunks.fold(unuploadedFoos) 
  { accu: MutableList<UnuploadedFoo>, chunk ->
     fooRemoteService
        .postFoos(chunk)
        .subscribeOn(Schedulers.io())
        .flatMapCompletable {
             if (it.isSuccessful) {
                         Completable.complete()
                    } else {
                        Timber.e("$it")
                       accu.add(it.body())

                    }
                }.blockingAwait()
        responses

  }
return Completable.complete()
}

最后,应用程序应显示所有不成功的 foo 列表或任何可用的列表。所以我需要从未上传的 Foos 的功能列表中传递。

标签: androidkotlinretrofit2rx-java2rx-android

解决方案


如果您可以修改postFoos一点的返回类型,则可以执行以下操作:

override fun postFoos(foos: List<Foo>): Observable<List<UnuploadedFoo>> {
    val chunks = foos.chunked(CHUNK_SIZE)
    val posters = chunks.map { chunk ->
        fooRemoteService.postFoos(chunk)
                .map { response ->
                    response.unUploaded.takeIf { !response.isSuccessful } ?: emptyList()
                }
                .filter { it.isNotEmpty() }
                .toObservable()
    }

    return Observable.concatDelayError(posters)
}

我想象你的服务有这样的东西:

data class Response(val isSuccessful: Boolean, val unUploaded: List<UnoploadedFoo>)

fun postFoos(foos: List<Foo>): Single<Response>

这里的诀窍是Concat

(...) 等待订阅您传递给它的每个额外的 Observable,直到前一个 Observable 完成。


推荐阅读