首页 > 解决方案 > rxJava如何使flatMap在多线程上运行

问题描述

我希望从 flatMap 发出的每个项目都在自己的线程上运行
这是一个真实用法的简化示例,其中每个项目都是一个 url 请求。
在每个单曲上添加 subscribeOn(Schedulers.io()) 仍然在单个线程上运行
这里的规则是什么?


Integer[] array= new Integer[100];
for (int i = 0; i < 100; i++){
    array[i] = i+1;
}

Observable.fromArray(array)
        .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
            @Override
            public SingleSource<Integer> apply(Integer i) throws Throwable {
                Log.i(TAG, "apply " +  i + " " + Thread.currentThread().getName());
                return Single.just(i).subscribeOn(Schedulers.io());
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull Integer i) {
               // Log.i(TAG, "onNext " + Thread.currentThread().getName() + i);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {
            }
        });

结果:

2020-12-16 22:54:47.010 10649-10700/com.example.rxjava I/MYTAG: apply 1 RxCachedThreadScheduler-1
2020-12-16 22:54:47.037 10649-10700/com.example.rxjava I/MYTAG: apply 2 RxCachedThreadScheduler-1
2020-12-16 22:54:47.038 10649-10700/com.example.rxjava I/MYTAG: apply 3 RxCachedThreadScheduler-1
2020-12-16 22:54:47.039 10649-10700/com.example.rxjava I/MYTAG: apply 4 RxCachedThreadScheduler-1
2020-12-16 22:54:47.040 10649-10700/com.example.rxjava I/MYTAG: apply 5 RxCachedThreadScheduler-1
2020-12-16 22:54:47.043 10649-10700/com.example.rxjava I/MYTAG: apply 6 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 7 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 8 RxCachedThreadScheduler-1

标签: javamultithreadingrx-javarx-java3

解决方案


除了使用 之外,您走在正确的轨道上just,它采用现有对象,因此无论之前创建和计算该对象是否发生。在这种情况下,它是flatMapSingle从同一个线程调用的 lambda。

您必须使计算本身成为并行运行的流程的一部分,fromCallable例如:

Observable.fromArray(array)
.flatMapSingle(i -> {
    return Single.fromCallable(() -> {
        Log.i(TAG, "apply " +  i + " " + Thread.currentThread().getName());
        return i + 1000;
    })
    .subscribeOn(Schedulers.io());
})
.observeOn(AndroidSchedulers.mainThread())
// ...
;

推荐阅读