首页 > 解决方案 > 缓冲区为空时如何停止从 PublishSubject 发射

问题描述

我正在尝试使用 RxJava 创建一个顺序下载服务。用户可以批量添加项目(20、30 等)或单个项目。这些项目将被添加到队列中,然后分批按 10 个顺序下载。为此,我使用的是 PublishSubject:

PublishSubject<Int> pubSubject = PublishSubject.create();

它发出用户添加的项目(id),然后将缓冲区运算符应用于批处理项目。使用这些 id,项目在 flatMap 中下载并在订阅的 onNext 中返回。

  pubSubject.buffer(1, TimeUnit.SECONDS, 10)
            .observeOn(Schedulers.io())
            .flatMap { idsBatch -> downloadByIds(idsBatch) }
            .subscribe(
                /* onNext */ { apiResponse -> handleResponse() },
                /* onError */ { handleError(it) },
                /* onComplete*/ { hideProgressBar() }
             )

代码大部分都按预期工作。项目已成功批处理并下载,但即使在发出所有项目后,缓冲区仍会使用空列表调用 flatMap 并且永远不会调用 onComplete()。

我想知道当缓冲区中没有更多项目时,RxJava 中是否有任何方法或方式来获取 onComplete 回调。因为否则我的下载服务永远不会终止。

标签: androidkotlinrx-javarx-java2rx-android

解决方案


您可以使用以下takeWhile操作:

只要每个项目满足指定条件,就返回一个Observable发射源发出的项目,然后在不满足此条件时立即完成。ObservableSource

pubSubject.buffer(1, TimeUnit.SECONDS, 10)
          .observeOn(Schedulers.io())
          .takeWhile { idsBatch -> idsBatch.isNotEmpty() }
          .flatMap { idsBatch -> downloadByIds(idsBatch) }
          .subscribe(
              /* onNext */ { apiResponse -> handleResponse() },
              /* onError */ { handleError(it) },
              /* onComplete*/ { hideProgressBar() }
           )

推荐阅读