java - 将多个 observables 连接到一个源
问题描述
我有很多发出不同类型的网络调用,例如字符串、整数等。
我试图让它们平行。
在官方 rxjava 文档中,我们可以阅读:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
这个例子很简单,因为我们将所有类型都设为 Int。但是,如果我们有不同的类型,例如 string、boolean、Int 怎么办?
- 来自这 5 的每个呼叫都是独立的
- 这5个调用的组将在一个方法中,并且该方法将由其他随机方法调用。
- 我们可以假设,5 次调用的结果将是第一次调用的类型 ->
string
解决方案
结合 Observables
使用多个源 Observable 来创建单个 Observable 的操作符
And/Then/When — 通过 Pattern 和 Plan 中介组合两个或多个 Observable 发出的项目集
CombineLatest — 当两个 Observable 中的任何一个发出 item 时,通过指定函数组合每个 Observable 发出的最新 item,并根据此函数的结果发出 item
Join — 每当一个 Observable 中的一个项目在根据另一个 Observable 发射的项目定义的时间窗口内发射时,合并两个 Observable 发射的项目
合并——通过合并它们的发射将多个 Observable 合并为一个
StartWith — 在开始从源 Observable 发射项目之前发射指定的项目序列
Switch — 将发出 Observables 的 Observable 转换为单个 Observable,该 Observable 发出由最近发出的那些 Observables 发出的项目
Zip — 通过指定函数将多个 Observable 的发射组合在一起,并根据此函数的结果为每个组合发射单个项目
我认为在你的情况下你可以使用zip
或combineLatest
阅读本文档http://reactivex.io/documentation/operators.html#combining
我想你也需要知道
RxJava 调度器简介。
Schedulers.io() - 这用于执行非 CPU 密集型操作,例如进行网络调用、读取磁盘/文件、数据库操作等,它维护一个线程池。
Schedulers.newThread() - 使用它,每次调度任务时都会创建一个新线程。通常建议不要使用调度程序,除非有一个非常长时间运行的操作。通过 newThread() 创建的线程不会被重用。
Schedulers.computation() - 此调度可用于执行 CPU 密集型操作,如处理大量数据、位图处理等,使用此调度器创建的线程数完全取决于可用的 CPU 内核数。
Schedulers.single() - 此调度程序将按添加的顺序执行所有任务。这可以在需要顺序执行时使用。
Schedulers.immediate() - 此调度程序通过阻塞主线程以同步方式立即执行任务。
Schedulers.trampoline() - 它以先进先出的方式执行任务。通过将后台线程的数量限制为1个,所有的计划任务将被一个一个地执行。
Schedulers.from() - 这允许我们通过限制要创建的线程数从执行程序创建调度程序。当线程池被占用时,任务会排队。
推荐阅读
- python - 从另一个创建列
- sql-server - 如何计算一个月中的星期日并选择第三个?
- python - 从 virtualenv 迁移到 venv
- monitoring - Zabbix 带宽监控
- amazon-web-services - 重新部署站点时出现 aws code-deploy 错误
- c++ - 有关新的 Windows 10 错误的信息:ERROR_CLOUD_FILE_ACCESS_DENIED
- .net - 无法读取配置部分“appSettings”,因为它缺少部分声明
- jasmine - Karma + Jasmine 导致重复警告 WARN: 'Env: not supported, default to development'
- java - 从重定向 URL 的标头中获取位置
- r - 在信息框 Shinydashboard 中打开新窗口