首页 > 解决方案 > Observable.create 和 fromPublisher 有什么区别?

问题描述

在下面的源代码中更改Observable.createObservable.fromPublisher不起作用。(如果 sample 不存在,则全部订阅,但如果 sample 存在,则不订阅任何内容。)

Observable.create 和 fromPublisher 有什么区别?

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class SampleMain {
    public static void main(String[] args) {
        Observable<String> o = Observable.create(s -> {
            new Thread(() -> {
                for (int i=0; i<100; i++) {
                    s.onNext("Hello Observable.fromPublisher() A" + i);
                    s.onNext("Hello Observable.fromPublisher() B" + i);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                s.onComplete();
            }).start();
        });
        o
                .sample(1, TimeUnit.MILLISECONDS)
                .subscribe(System.out::println);
    }
}

标签: rx-java2

解决方案


fromPublisher需要一个org.reactivestreams.Publisher遵守规则的正确实施。这些Publisher通常来自 3rd 方库或 API。

create内置了基础设施,可以将更简单的发射器样式 API 转换为一个Observable,这样开发人员就不必担心太多的底层协议。

我还可以将您的注意力引向以下的 javadoc fromPublisher

Publisher 必须遵循 Reactive-Streams 规范。违反规范可能会导致未定义的行为。

如果可能,请使用 create(ObservableOnSubscribe) 创建类似源的 Observable。

请注意,尽管 Publisher 似乎是一个功能接口,但不建议通过 lambda 实现它,因为规范要求使用无状态 lambda 无法实现的状态管理。


推荐阅读