首页 > 解决方案 > 将多个 observables 连接到一个源

问题描述

我有很多发出不同类型的网络调用,例如字符串、整数等。

我试图让它们平行。

在官方 rxjava 文档中,我们可以阅读:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

这个例子很简单,因为我们将所有类型都设为 Int。但是,如果我们有不同的类型,例如 string、boolean、Int 怎么办?

标签: javaandroidkotlinrx-javarx-java2

解决方案


结合 Observables

使用多个源 Observable 来创建单个 Observable 的操作符

  • And/Then/When — 通过 Pattern 和 Plan 中介组合两个或多个 Observable 发出的项目集

  • CombineLatest — 当两个 Observable 中的任何一个发出 item 时,通过指定函数组合每个 Observable 发出的最新 item,并根据此函数的结果发出 item

  • Join — 每当一个 Observable 中的一个项目在根据另一个 Observable 发射的项目定义的时间窗口内发射时,合并两个 Observable 发射的项目

  • 合并——通过合并它们的发射将多个 Observable 合并为一个

  • StartWith — 在开始从源 Observable 发射项目之前发射指定的项目序列

  • Switch — 将发出 Observables 的 Observable 转换为单个 Observable,该 Observable 发出由最近发出的那些 Observables 发出的项目

  • Zip — 通过指定函数将多个 Observable 的发射组合在一起,并根据此函数的结果为每个组合发射单个项目

我认为在你的情况下你可以使用zipcombineLatest

阅读本文档http://reactivex.io/documentation/operators.html#combining

我想你也需要知道

RxJava 调度器简介。

  • Schedulers.io() - 这用于执行非 CPU 密集型操作,例如进行网络调用、读取磁盘/文件、数据库操作等,它维护一个线程池。

  • Schedulers.newThread() - 使用它,每次调度任务时都会创建一个新线程。通常建议不要使用调度程序,除非有一个非常长时间运行的操作。通过 newThread() 创建的线程不会被重用。

  • Schedulers.computation() - 此调度可用于执行 CPU 密集型操作,如处理大量数据、位图处理等,使用此调度器创建的线程数完全取决于可用的 CPU 内核数。

  • Schedulers.single() - 此调度程序将按添加的顺序执行所有任务。这可以在需要顺序执行时使用。

  • Schedulers.immediate() - 此调度程序通过阻塞主线程以同步方式立即执行任务。

  • Schedulers.trampoline() - 它以先进先出的方式执行任务。通过将后台线程的数量限制为1个,所有的计划任务将被一个一个地执行。

  • Schedulers.from() - 这允许我们通过限制要创建的线程数从执行程序创建调度程序。当线程池被占用时,任务会排队。


推荐阅读