android - 如何对远程服务一一请求调用 usnig rx 和 kotlin?
问题描述
我有应用程序必须向其发送数据的远程服务:retrofit2 中的定义:
interface FooRemoteService {
@POST("/foos")
fun postFoos(@Body foos: List<FooPojo>): Observable<Response<List<String>>
}
但调用的限制一次不超过 X Foos。每个调用都可以返回 206 代码“部分成功”以及不成功的上传 foo 列表。还有 413“请求实体太大”。当然还有 400 和 500。
并且应用程序需要发送未知数量的 foo 项目(由用户在运行时定义)。
为了避免服务应用程序的 DDoS,需要一一发送此调用。
所以我在我的 FooRepositoryImpl 中做了这样的实现:
这是一个想法。我对以下解决方案不满意,我相信它可以做得更好,但我已经没有想法了。那么有什么建议吗?
override fun postFoos(foos: List<Foo>) Completable {
val fooChunks = divideListInToChuncksUnderRequestLimit(foos)
val unuploadedFoos = mutableListOf<UnuploadedFoo>()
fooChunks.fold(unuploadedFoos)
{ accu: MutableList<UnuploadedFoo>, chunk ->
fooRemoteService
.postFoos(chunk)
.subscribeOn(Schedulers.io())
.flatMapCompletable {
if (it.isSuccessful) {
Completable.complete()
} else {
Timber.e("$it")
accu.add(it.body())
}
}.blockingAwait()
responses
}
return Completable.complete()
}
最后,应用程序应显示所有不成功的 foo 列表或任何可用的列表。所以我需要从未上传的 Foos 的功能列表中传递。
解决方案
如果您可以修改postFoos
一点的返回类型,则可以执行以下操作:
override fun postFoos(foos: List<Foo>): Observable<List<UnuploadedFoo>> {
val chunks = foos.chunked(CHUNK_SIZE)
val posters = chunks.map { chunk ->
fooRemoteService.postFoos(chunk)
.map { response ->
response.unUploaded.takeIf { !response.isSuccessful } ?: emptyList()
}
.filter { it.isNotEmpty() }
.toObservable()
}
return Observable.concatDelayError(posters)
}
我想象你的服务有这样的东西:
data class Response(val isSuccessful: Boolean, val unUploaded: List<UnoploadedFoo>)
fun postFoos(foos: List<Foo>): Single<Response>
这里的诀窍是Concat
:
(...) 等待订阅您传递给它的每个额外的 Observable,直到前一个 Observable 完成。
推荐阅读
- kotlin - PeriodicWorkRequest 不重复
- python - PySpark where 子句条件条件
- c# - 在 IIS 上的已发布应用程序中,每次我重新启动服务器或更改应用程序配置中的某些内容时,都会卸载项目引用
- java - 需要澄清 Oracle 教程解释何时使用迭代器与 for-each 构造
- android - android:停止通知振动
- angular - 角度可选 mat-optgroup
- python-3.x - 如何解决 webrtcvad.Error:处理帧时出错?
- java - java.lang.OutOfMemoryError:无法在 Windows 中创建新的本地线程 ..recreate
- c - 这个 c 程序 null 安全吗?
- python - 熊猫多索引,删除条件仍然成立的行?