android - 如何等待 ObservableEmitter 返回 onComplete()
问题描述
我在TrackerConnectionManager
课堂上有以下方法
@Override
public Observable<TrackerFile> deleteFiles(final List<TrackerFile> trackerFiles) {
Observable<TrackerFile> deleteFilesObservable = Observable.create(new ObservableOnSubscribe<TrackerFile>() {
@Override
public void subscribe(@NonNull ObservableEmitter<TrackerFile> emitter) throws Exception {
Log.i(TAG, "deleteFiles() --> subscribe() --> trackerFiles.size(): " + trackerFiles.size());
for (TrackerFile file : trackerFiles) {
Log.i(TAG, "deleteFiles() --> subscribe() --> emitter.onNext(trackerConnectionAPI.deleteFile()): " + file.getFileName());
final boolean success = trackerConnectionAPI.deleteFile((char) file.getFileId());
// Error mitigation in case that TrackerConnectionAPI returns false
if (success) {
emitter.onNext(file);
} else {
Log.w(TAG, "trackerConnectionAPI.deleteFile() return false for file: " + file);
}
}
Log.i(TAG, "deleteFiles() --> subscribe() --> emitter.onComplete()");
emitter.onComplete();
}
});
这应该调用cleanTracker方法:
private void startTrackerRoutine() {
Disposable disposable = trackerConnectionManager.iterateFiles()
.subscribe(trackerFiles -> {
if (trackerFiles.isEmpty()) {
Log.d(TAG, "startTrackerRoutine() --> no files on Tracker --> disconnect & startNoSessionFoundFragment");
disconnectFromTracker();
activity.startNoSessionFoundFragment();
return;
}
Log.i(TAG, "startTrackerRoutine() --> Tracker has " + trackerFiles.size() + " files to process.");
// TODO: THIS CALL NEEDS TO BE BLOCKING; else we have a RACE CONDITION with the actual transfer of files
cleanTracker(trackerFiles);
ArrayList<TrackerFile> newSessions = SessionFileHelper.findNewSessions(trackerFiles, tracker.getId(), realm);
Log.i(TAG, "startTrackerRoutine() --> found newSessions: " + newSessions);
if (newSessions.isEmpty()) {
Log.d(TAG, "startTrackerRoutine() --> NO NEW Sessions --> disconnect & startNoSessionFoundFragment");
disconnectFromTracker();
activity.startNoSessionFoundFragment();
} else {
activity.startTransferSessionFragment(newSessions);
}
}, throwable -> {
Log.e(TAG, "startTrackerRoutine() --> ERROR in trackerConnectionManager.iterateFiles() --> disconnectFromTracker()", throwable);
disconnectFromTracker();
if (throwable instanceof ForeignSessionException) {
fragment.showForeignTrackerDialog();
} else {
fragment.showConnectionFailedDialog();
}
});
activeRequests.add(disposable);
}
现在我想知道,如何调用此deleteFiles()
方法并等待它完成 -->emitter.onComplete()
被调用。
我试过这个:
private void cleanTracker(final List<TrackerFile> trackerFiles) {
final List<TrackerFile> filesToDelete = SessionFileHelper.findNonSessionFiles(trackerFiles);
Log.i(TAG, "cleanTracker() --> found " + filesToDelete.size() + " files to be deleted");
Disposable disposable = trackerConnectionManager.deleteFiles(filesToDelete)
.subscribe(trackerFile -> {
if (trackerFile != null) {
Log.d(TAG, "cleanTracker() --> successfully deleted: " + trackerFile);
} else {
Log.w(TAG, "cleanTracker() --> FAILED to delete a file");
}
}, throwable -> {
Log.e(TAG, "cleanTracker() --> ERROR while deleting files: " + filesToDelete);
});
Log.d(TAG, "cleanTracker() --> activeRequests.add(disposable)");
activeRequests.add(disposable);
}
但这对我没有帮助,因为它并没有真正等待/阻塞,直到所有文件都被删除并且其他命令弄乱了我的套接字连接。
解决方案
等待子流程完成是concat*
操作员的工作。您的情况可以通过以下方式解决concatMap
:
private void startTrackerRoutine() {
Disposable disposable = trackerConnectionManager.iterateFiles()
.observeOn(Schedulers.io())
.concatMap(trackerFiles -> {
if (trackerFiles.isEmpty()) {
Log.d(TAG, "startTrackerRoutine() --> no files on Tracker --> disconnect & startNoSessionFoundFragment");
disconnectFromTracker();
activity.startNoSessionFoundFragment();
return Observable.empty();
}
Log.i(TAG, "startTrackerRoutine() --> Tracker has " + trackerFiles.size() + " files to process.");
return cleanTrackerFlow(trackerFiles);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(trackerFiles -> {
ArrayList<TrackerFile> newSessions = SessionFileHelper.findNewSessions(trackerFiles, tracker.getId(), realm);
Log.i(TAG, "startTrackerRoutine() --> found newSessions: " + newSessions);
if (newSessions.isEmpty()) {
Log.d(TAG, "startTrackerRoutine() --> NO NEW Sessions --> disconnect & startNoSessionFoundFragment");
disconnectFromTracker();
activity.startNoSessionFoundFragment();
} else {
activity.startTransferSessionFragment(newSessions);
}
}, throwable -> {
Log.e(TAG, "startTrackerRoutine() --> ERROR in trackerConnectionManager.iterateFiles() --> disconnectFromTracker()", throwable);
disconnectFromTracker();
if (throwable instanceof ForeignSessionException) {
fragment.showForeignTrackerDialog();
} else {
fragment.showConnectionFailedDialog();
}
});
activeRequests.add(disposable);
}
private Observable<List<TrackerFile>> cleanTracker(final List<TrackerFile> trackerFiles) {
final List<TrackerFile> filesToDelete = SessionFileHelper.findNonSessionFiles(trackerFiles);
Log.i(TAG, "cleanTracker() --> found " + filesToDelete.size() + " files to be deleted");
return trackerConnectionManager.deleteFiles(filesToDelete)
.doOnNext(trackerFile -> {
Log.d(TAG, "cleanTracker() --> successfully deleted: " + trackerFile);
})
.doOnError(throwable -> {
Log.e(TAG, "cleanTracker() --> ERROR while deleting files: " + filesToDelete);
})
.ignoreElements()
.andThen(Observable.just(trackerFiles));
}
推荐阅读
- python - ImportError:通过 GUI 运行时没有名为 face_recognition 的模块
- discord.py - 我可以用wavelink python播放本地音频文件吗?
- xml - 在自定义操作栏中制作居中徽标
- javascript - 使用 JS 调用时,Bootstrap 5 折叠显示/隐藏不起作用
- c++ - C++ 乘法表
- python - Pyodbc 错误“TVP 的行必须是序列对象。”,'HY000'
- flutter - 颤振项目的 Android Studio 中缺少“图像资产”选项
- angular - Angular 10 中的 Chart.js:指定颜色未显示在多系列条形图中(而不是随机颜色)
- google-chrome-extension - 检测 Chrome 扩展程序是否提供后退/前进按钮
- ios - UIViewControllerRepresentable 未正确占用空间