java - rxJava如何使flatMap在多线程上运行
问题描述
我希望从 flatMap 发出的每个项目都在自己的线程上运行
这是一个真实用法的简化示例,其中每个项目都是一个 url 请求。
在每个单曲上添加 subscribeOn(Schedulers.io()) 仍然在单个线程上运行
这里的规则是什么?
Integer[] array= new Integer[100];
for (int i = 0; i < 100; i++){
array[i] = i+1;
}
Observable.fromArray(array)
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer i) throws Throwable {
Log.i(TAG, "apply " + i + " " + Thread.currentThread().getName());
return Single.just(i).subscribeOn(Schedulers.io());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer i) {
// Log.i(TAG, "onNext " + Thread.currentThread().getName() + i);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
结果:
2020-12-16 22:54:47.010 10649-10700/com.example.rxjava I/MYTAG: apply 1 RxCachedThreadScheduler-1
2020-12-16 22:54:47.037 10649-10700/com.example.rxjava I/MYTAG: apply 2 RxCachedThreadScheduler-1
2020-12-16 22:54:47.038 10649-10700/com.example.rxjava I/MYTAG: apply 3 RxCachedThreadScheduler-1
2020-12-16 22:54:47.039 10649-10700/com.example.rxjava I/MYTAG: apply 4 RxCachedThreadScheduler-1
2020-12-16 22:54:47.040 10649-10700/com.example.rxjava I/MYTAG: apply 5 RxCachedThreadScheduler-1
2020-12-16 22:54:47.043 10649-10700/com.example.rxjava I/MYTAG: apply 6 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 7 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 8 RxCachedThreadScheduler-1
解决方案
除了使用 之外,您走在正确的轨道上just
,它采用现有对象,因此无论之前创建和计算该对象是否发生。在这种情况下,它是flatMapSingle
从同一个线程调用的 lambda。
您必须使计算本身成为并行运行的流程的一部分,fromCallable
例如:
Observable.fromArray(array)
.flatMapSingle(i -> {
return Single.fromCallable(() -> {
Log.i(TAG, "apply " + i + " " + Thread.currentThread().getName());
return i + 1000;
})
.subscribeOn(Schedulers.io());
})
.observeOn(AndroidSchedulers.mainThread())
// ...
;
推荐阅读
- database - 将存在用户数据库与 microsoft 身份集成?
- javascript - 子组件向数据库添加元素时如何重新渲染父组件?
- java - 无限循环重置偏移并寻找最新偏移
- java - 如何定义 Maven pom.xml 以使用 org.eclipse.jetty.server.ssl.SslSocketConnector
- java - 无法从另一个 docker 容器将消息推送到部署在 docker 容器中的 Kafka 主题
- kubernetes - 将 csi 驱动程序安装到 Kubernetes 集群 v1.16 时出错
- javascript - apollo graphql 突变应该返回哪个值?
- c# - 渲染自适应卡片 Html 时,未使用数据模板
- c++ - 从 UART 接收到错误的字符?
- azure - 如何在keycloak的oidc身份提供者中映射azure object_id?