首页 > 解决方案 > 如何从 flatMap 运算符显示可观察对象的内容

问题描述

我试图了解 .fromCallable 的工作原理,所以我创建了下面的简单示例。如下所示,我将一个整数数组转换为 observables,然后我想显示 flatMap 的内容。因此,我使用 .map 遍历数组中的所有项目,但观察者返回

MainActivitygetFromCallableObserver: onNext->o: [I@18fb0dbb

我希望 onNext() 被调用的次数与数组中的项目数一样多,并且对于每次调用 onNext(),我希望对数组中的每个项目进行 ses。

请让我知道如何将整数数组转换为 obervables,然后应用 .map 运算符,以便观察者中的 onNext 分别显示每个项目。

代码

private void executeRxFromCallable() {
    final int[] delayValue = {-1};
    int[] nums = new int[7];
    Observable.fromCallable(new Callable<int[]>() {
        @Override
        public int[] call() throws Exception {
            while(++delayValue[0] < 7) {
                Thread.sleep(1000);
                Log.i(TAG, "delayValue[0]: " + delayValue[0]);
                nums[delayValue[0]] = delayValue[0] * 10;
            }
            Log.i(TAG, "total delay: " + delayValue[0]);
            Log.i(TAG, "nums.length: " + nums.length);
            return nums;
        }
    })
            .flatMap(new Function<int[], ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull int[] ints) throws Exception {
                    return Observable.fromArray(ints);
                }
            })
            .map(new Function<Object, Object>() {
                @Override
                public Object apply(@NonNull Object o) throws Exception {
                    return o;
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe( this.getFromCallableObserver());

}

private Observer<? super Object> getFromCallableObserver() {
    return new Observer<Object>() {
        public void onSubscribe(@NonNull Disposable d) {
            Log.i(TAG + "getFromCallableObserver", "onSubscribe->d: " + d);

        }

        @Override
        public void onNext(@NonNull Object o) {
            Log.i(TAG + "getFromCallableObserver", "onNext->o: " + String.valueOf(o));
        }

        @Override
        public void onError(@NonNull Throwable e) {
            Log.i(TAG + "getFromCallableObserver", "onError->e: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.i(TAG + "getFromCallableObserver", "onComplete");
        }
    };
}

结果

MainActivitygetFromCallableObserver: onNext->o: [I@18fb0dbb

标签: androidrx-javaobservablerx-java2rx-android

解决方案


如果您只想使用以下方法将值列表转换为单独的事件flatMap,则可以这样做Observable.fromIterable(...)

@RunWith(RobolectricTestRunner.class)
public class RxTest {
    @Test
    public void testFromCallableUsage() throws Exception {
        Observable.fromCallable(() -> {
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i < 7; i++) {
                list.add(i + 1);
                //do your sleep here
            }
            return list;
        })
            .flatMap(array -> {
                return Observable.fromIterable(array);
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(result -> System.out.println("Value : " + result),
              throwable -> System.out.println(throwable.getMessage()));

        Thread.sleep(1000);
    }
}

输出

值:1

值:2

价值 : 3

价值 : 4

价值 : 5

价值 : 6

价值 : 7

实际上,为了发送 7 个事件,它们之间的周期为 1000 毫秒,您也可以使用intervalRange。然后,在您的系统中,map您可以进行所需的任何计算/请求,然后通过以下方式进一步发送它们Observable.just(...)

@RunWith(RobolectricTestRunner.class)
public class RxJavaMergeTest {
    @Test
    public void testFromCallableUsage() {
        Observable.intervalRange(0, 7, 0, 1000,  TimeUnit.MILLISECONDS)
            .concatMap(element -> {
                //do whatever modifications you need with your value and then send it
                return Observable.just(element);
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(result -> System.out.println("Value : " + result),
              throwable -> System.out.println(throwable.getMessage()));

        // wait until all the tasks complete
        Thread.sleep(10 * 1000);
    }
}

推荐阅读