首页 > 解决方案 > 避免observeOn的副作用

问题描述

我在下面编写了一个模拟逻辑测试,以模拟我试图在我们的代码中创建的行为。我注意到在下面的测试中,发射器上似乎有某种锁定,只有在您使用 observeOn 运算符时才会发生。这给我带来的问题是,它只在锁被释放后才发出对象,并一次性完成所有这些工作。这影响我的并行现实生活案例是创建股票报价流(模拟流和真实流)。它一直等到报价队列(我认为在 observeOn 中)被填满,然后看起来一次用 25,000 个项目钉住我们的订户。我们想使用 observeOn 操作符,因为我们有多个订阅者,我们不希望他们都占用一个 subscribeOn 线程。有什么建议么?

@Test
public void testObserveOnLogicOutsideCreate() {

    Observable.<AtomicInteger>create(numEmit -> {
        observableEmitter = numEmit;
    })
            .observeOn(Schedulers.newThread())
            .subscribe(printNum -> {
                System.out.println(printNum);
                Assertions.assertTrue(nowObserveOnSwitchesOn);
                prevNum = printNum.get();
            });

    while (i < 10) {
        number.set(++i);
        observableEmitter.onNext(number);
        if (i == 10) {
            nowObserveOnSwitchesOn = true;
        }
    }

}

标签: javarx-java2

解决方案


推荐阅读