rx-java - rxjava中的重试缓冲区
问题描述
一个热的 Observable 发射项目。我想将这些项目上传到服务器。有两个考虑:
- 由于io操作的费用,我想批量处理这些项目并作为数组上传
- 由于 io 操作的不可靠性,我希望将失败的批次上传添加到下一批。
Uploads succeed:
1 - 2 - 3 - 4 - 5
------------------
u(1,2,3) - u(4,5)
First upload fails:
1 - 2 - 3 - 4 - 5
------------------
u(1,2,3) - u(1,2,3,4,5)
我可以通过使用buffer
运算符来处理第一个,但不知道如何满足第二个要求。
解决方案
这是我将失败存储在队列中的想法
public class StackOverflow {
public static void main(String[] args) {
// store any failures that may have occurred
LinkedBlockingQueue<String> failures = new LinkedBlockingQueue<>();
toUpload()
// buffer however you want
.buffer(5)
// here is the interesting part
.flatMap(strings -> {
// add any previous failures
List<String> prevFailures = new ArrayList<>();
failures.drainTo(prevFailures);
strings.addAll(prevFailures);
return Flowable.just(strings);
})
.flatMapCompletable(strings -> {
// upload the data
return upload(strings).doOnError(throwable -> {
// if its an upload failure:
failures.addAll(strings);
});
}).subscribe();
}
// whatever your source flowable is
private static Flowable<String> toUpload() {
return Flowable.fromIterable(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i"));
}
// some upload operation
private static Completable upload(List<String> strings) {
return Completable.complete();
}
}
这里的一些边缘情况是事实,如果最后一个可流动的缓冲组失败,则不会重试。这可以通过retryWhen
操作员来实现,但基本思想与使用队列相同
推荐阅读
- java - 更新哈希映射格式的哈希映射中的对象变量
> - types - 为什么在循环后迭代字符串不执行代码?
- xslt - 通过 XSLT 将 XML 转为文本
- java - 串行链接错误和错误路径 jre 1.7.0_15 32-bi
- html - 从 hta 按钮启动 plink-ssh 连接并将值传递给 ssh 命令
- android - 我应该在 onResume() 调用中从数据库中检索记录吗?
- jenkins-pipeline - Jenkins Scripted Pipeline: sshCommand 执行 statusCode
- php - 无法找到我的 iOS 设备 ID 并连接到 APNS 服务器以在 PHP 中进行推送通知
- swift - 如何在 swift (FCM) 中接收数据通知
- javascript - 如何修复“http://localhost:3000”已被 CORS 策略阻止:请求的资源上不存在“Access-Control-Allow-Origin”标头。