android - 如何连续执行许多 RxJava2 Flux
问题描述
我正在向自己介绍 RxJava2,但我觉得我做错了什么。就我而言,我想做一些以下异步操作。
在此示例中,第一个操作是检查设备是否已连接(wifi 或数据,让我们承认这需要时间),然后我想连接到 api,然后我想进行 http 调用以获取列表(可观察) 然后使用它。如果其中一项操作失败,则应在订阅中引发和处理 onError 或异常。
我有这个工作的代码:
Single.create((SingleEmitter<Boolean> e) -> e.onSuccess(Connectivity.isDeviceConnected(MainActivity.this)) )
.subscribeOn(Schedulers.io())
.flatMap(isDeviceConnected -> {
Log.i("LOG", "isDeviceConnected : "+ isDeviceConnected);
if(!isDeviceConnected)
throw new Exception("whatever"); // TODO : Chercher vrai erreur
return awRepository.getFluxAuthenticate(host, port, user, password); // Single<DisfeApiAirWatch>
})
.toObservable()
.flatMap(awRepository::getFluxManagedApps) // List of apps : Observable<AirwatchApp>
.observeOn(AndroidSchedulers.mainThread())
.doFinally(this::hideProgressDialog)
.subscribe(
app -> Log.i("LOG", "OnNext : "+ app),
error -> Log.i("LOG", "Error : " + error),
() -> Log.i("LOG", "Complete : ")
);
但是做一个为简单的“如果”发出布尔值的人听起来是错误的。Completable 似乎更合乎逻辑(工作与否,继续或停止)。我尝试使用以下代码,但它不起作用。
Completable.create((CompletableEmitter e) -> {
if(Connectivity.isDeviceConnected(MainActivity.this))
e.onComplete(); // Guess not good, should call the complete of subscribe ?
else
e.onError(new Exception("whatever"));
} ).toObservable()
.subscribeOn(Schedulers.io())
.flatMap(awRepository.getFluxAuthenticate(host, port, user, password)) //Single<DisfeApiAirWatch>
.toObservable()
.flatMap(awRepository::getFluxManagedApps) // List of apps : Observable<AirwatchApp>
.observeOn(AndroidSchedulers.mainThread())
.doFinally(this::hideProgressDialog)
.subscribe(
app -> Log.i("LOG", "OnNext : "+ app),
error -> Log.i("LOG", "Error : " + error),
() -> Log.i("LOG", "Complete : ")
);
如何使这段代码工作?
我知道我可以先订阅可兼容的,然后在这个的“onSuccess”中编写另一个通量/其余代码。但我不认为堆栈在彼此内部流动是一个好的解决方案。
此致
解决方案
Completable
没有价值,所以flatMap
永远不会被调用。您必须使用andThen
身份验证成功值并将其作为后续的输入flatMap
:
Completable.create((CompletableEmitter e) -> {
if(Connectivity.isDeviceConnected(MainActivity.this))
e.onComplete();
else
e.onError(new Exception("whatever"));
})
.subscribeOn(Schedulers.io())
.andThen(awRepository.getFluxAuthenticate(host, port, user, password)) // <-----------
.flatMapObservable(awRepository::getFluxManagedApps)
.observeOn(AndroidSchedulers.mainThread())
.doFinally(this::hideProgressDialog)
.subscribe(
app -> Log.i("LOG", "OnNext : "+ app),
error -> Log.i("LOG", "Error : " + error),
() -> Log.i("LOG", "Complete : ")
);
推荐阅读
- jquery - 删除选择选项onclick
- c# - 击中对撞机时使角色旋转 180 度的方法
- javascript - JavaScript 复制一个类属性并创建一个新的相同对象
- php - 具有两个相等但未知部分的正则表达式模式
- python - 如何在莳萝中保存复杂的类对象?
- digital-signature - 有没有办法确保签名者在没有看到消息的情况下生成了有效的(消息、签名)对?
- lua - 使用 Neovim 配置 Packer
- reactjs - 如何测试 Material-UI Popover 关闭实现
- shell - Ubuntu Server 21.10 - 插入 USB 设备时如何运行 Shell 脚本
- android - 在 Android 中切换应用时保持视频通话应用运行