android - 缓冲区为空时如何停止从 PublishSubject 发射
问题描述
我正在尝试使用 RxJava 创建一个顺序下载服务。用户可以批量添加项目(20、30 等)或单个项目。这些项目将被添加到队列中,然后分批按 10 个顺序下载。为此,我使用的是 PublishSubject:
PublishSubject<Int> pubSubject = PublishSubject.create();
它发出用户添加的项目(id),然后将缓冲区运算符应用于批处理项目。使用这些 id,项目在 flatMap 中下载并在订阅的 onNext 中返回。
pubSubject.buffer(1, TimeUnit.SECONDS, 10)
.observeOn(Schedulers.io())
.flatMap { idsBatch -> downloadByIds(idsBatch) }
.subscribe(
/* onNext */ { apiResponse -> handleResponse() },
/* onError */ { handleError(it) },
/* onComplete*/ { hideProgressBar() }
)
代码大部分都按预期工作。项目已成功批处理并下载,但即使在发出所有项目后,缓冲区仍会使用空列表调用 flatMap 并且永远不会调用 onComplete()。
我想知道当缓冲区中没有更多项目时,RxJava 中是否有任何方法或方式来获取 onComplete 回调。因为否则我的下载服务永远不会终止。
解决方案
您可以使用以下takeWhile
操作:
只要每个项目满足指定条件,就返回一个
Observable
发射源发出的项目,然后在不满足此条件时立即完成。ObservableSource
pubSubject.buffer(1, TimeUnit.SECONDS, 10)
.observeOn(Schedulers.io())
.takeWhile { idsBatch -> idsBatch.isNotEmpty() }
.flatMap { idsBatch -> downloadByIds(idsBatch) }
.subscribe(
/* onNext */ { apiResponse -> handleResponse() },
/* onError */ { handleError(it) },
/* onComplete*/ { hideProgressBar() }
)
推荐阅读
- node.js - 在执行第二个查询Nodejs Mongodb之前竞争第一个查询
- python - 元组的约束组合
- azure-devops - Azure Pipeline Ruby Step 失败且没有错误
- karate - 如何在空手道 0.9.0 版本中上传 CSV 文件作为发布请求?
- kendo-grid - IE 11 中的 Kendo Grid 阅读链接标签
- java - 我想知道如何在 Eclipse 中运行处理代码
- javascript - 使用 SSH2 和 SFTPStream 将文件从服务器流式传输到 AWS S3 存储桶
- java - Spring boot AuthenticationSuccessHandler 被忽略
- python - 将多个字典以长格式组合成一个 pandas 数据框
- javascript - 将js文件链接到hbs文件