首页 > 技术文章 > RxJava2实战--第二章 RxJava基础知识

wangjiaghe 2019-11-19 17:18 原文

第二章 RxJava基础知识

1. Observable

1.1 RxJava的使用三步骤

  1. 创建Observable
  2. 创建Observer
  3. 使用subscribe()进行订阅
Observable.just("Hello World")
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
});

subscribe()有多个重载方法:

Disposable subscribe()

Disposable subscribe(Consumer<? super T> onNext)

Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete) 

Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) 
            
void subscribe(Observer<? super T> observer)            
            

Consumer是消费者。

onComplete是一个Action,它与Consumer的区别如下:

  • Action:无参数
  • Consumer:单一参数类型

第五种重载例子:

Observable.just("Hello World")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.out.println(throwable.getMessage());
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("onComplete");
                    }
                }, new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        System.out.println("subscribe");
                    }
                });
 执行结果:
 	subscribe
	Hello World
	onComplete
                

Rxjava2中,Observable不在支持Subscriber,而是需要使用Observer作为观察者。

 Observable.just("Hello World")
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
                
 执行结果:
 	onSubscribe
	Hello World
	onComplete

在Rxjava中,被观察者、观察者、subscribe()方法三者缺一不可。只有使用了subscribe(),被观察者才会开始发送数据。

1.2 RxJava2的五种观察者模式:

类型 描述
Observable 能够发射0或n个数据,并以成功或者错误事件终止
Flowable 能够发射0或n个数据,并以成功或者错误事件终止。支持背压,可以控制数据源发射的速度
Single 只发射单个数据或者错误事件
Completable 从来不发射数据,只处理onComplete和onError事件。可以看成Rx的Runnable
Maybe 能够发射0或者1个数据,要么成功,要么失败。有点类似于Optional

5种被观察者类型中只有Flowable支持背压。

1.3 do操作符

操作符 用途
doOnSubscribe 一旦观察者订阅了Observable,它就会被调用
doOnLifecycle 可以在观察者订阅之后,设置是否取消订阅
doOnNext 它产生的Observable每发射一项数据就会调用它一次,它的Consumer接受发射的数据项。一般用于在subscribe之前对数据进行处理
doOnEach 它产生的Observable每发射一项数据就会调用它一次,不仅包括onNext,还包括onError和onComplete
doAfterNext 在onNext之后执行,而doOnNext()是在onNext之前执行
doOnComplete 当产生的Observable在正常终止调用onComplete时会被调用
doFinally 在当它产生的Observable终止之后会被调用,无论是正常终止还是异常终止。doFinally优先于doAfterTerminate的调用
doAfterTerminate 注册一个Action,当Observable调用onComplete或onError时调用
 public static void main(String[] args) {
        Observable.just("Hello World")
            .doOnNext(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("doOnNext:"+s);
            }
        }).doAfterNext(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("doAfterNext:"+s);
            }
        }).doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("doOnComplete");
            }
        }).doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("doOnSubscribe:");

            }
        }).doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("doAfterTerminate:");
            }
        }).doOnEach(new Consumer<Notification<String>>() {
            @Override
            public void accept(Notification<String> stringNotification) throws Exception {
                System.out.println("doOnEach:"+(stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError")+"  value:"+stringNotification.getValue());
            }
        }).doFinally(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("doFinally:");
            }
        }).doOnLifecycle(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("doOnLifecycle:"+disposable.isDisposed());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("doOnLifecycle: run");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe:");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:"+s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError:"+e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete:");
            }
        });
        
        执行结果:
        doOnSubscribe:
        doOnLifecycle:false
        onSubscribe:
        doOnNext:Hello World
        doOnEach:onNext  value:Hello World
        onNext:Hello World
        doAfterNext:Hello World
        doOnComplete
        doOnEach:onComplete  value:null
        onComplete:
        doFinally:
        doAfterTerminate:

doOnEach与doOnNext、doOnComplete()、doOnError执行顺序:谁在前面先执行谁。

doOnEach放到doOnNext上面执行:

        ...
        .doOnEach(new Consumer<Notification<String>>() {
            @Override
            public void accept(Notification<String> stringNotification) throws Exception {
                System.out.println("doOnEach:"+(stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError")+"  value:"+stringNotification.getValue());
            }
        }).doOnNext(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("doOnNext:"+s);
        }
        })...
        
        执行结果:
        
        doOnSubscribe:
        doOnLifecycle:false
        onSubscribe:
        doOnEach:onNext  value:Hello World
        doOnNext:Hello World
        onNext:Hello World
        doAfterNext:Hello World
        doOnEach:onComplete  value:null
        doOnComplete
        onComplete:
        doFinally:
        doAfterTerminate:
        

2.HotObservable和Cold Observable

2.1 Observabel分类

Observable 有 Cold 和 Hot 之分。

Hot Observable 无论有没有 Subscriber 订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时,Hot Observable 与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。

Cold Observable是 只有 Subscriber 订阅时,才开始执行发射数据流的代码。并且 Cold Observable 和 Subscriber 只能是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对 Cold Observable 而言,有多个Subscriber的时候,他们各自的事件是独立的。

2.2 Cold Observable

Observable 的 just、creat、range、fromXXX 等操作符都能生成Cold Observable。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() 		{
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());

        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

执行结果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
   subscriber2: 3
subscriber1: 3
subscriber1: 4
   subscriber2: 4
   subscriber2: 5
subscriber1: 5
subscriber1: 6
   subscriber2: 6
subscriber1: 7
   subscriber2: 7
subscriber1: 8
   subscriber2: 8
subscriber1: 9
   subscriber2: 9

可以看出,subscriber1 和 subscriber2 的结果并不一定是相同的,二者是完全独立的。

尽管 Cold Observable 很好,但是对于某些事件不确定何时发生以及不确定 Observable 发射的元素数量,那还得使用 Hot Observable。比如:UI交互的事件、网络环境的变化、地理位置的变化、服务器推送消息的到达等等。

2.3 Clod Observable转Hot Observable

1. 使用publish,生成ConnectableObservable

使用 publish 操作符,可以让 Cold Observable 转换成 Hot Observable。它将原先的 Observable 转换成 ConnectableObservable。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };

        ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        observable.connect();

        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        observable.subscribe(subscriber3);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

注意,生成的 ConnectableObservable 需要调用connect()才能真正执行。

执行结果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
      subscriber3: 2
subscriber1: 3
   subscriber2: 3
      subscriber3: 3
subscriber1: 4
   subscriber2: 4
      subscriber3: 4
subscriber1: 5
   subscriber2: 5
      subscriber3: 5
subscriber1: 6
   subscriber2: 6
      subscriber3: 6
subscriber1: 7
   subscriber2: 7
      subscriber3: 7
subscriber1: 8
   subscriber2: 8
      subscriber3: 8
subscriber1: 9
   subscriber2: 9
      subscriber3: 9
subscriber1: 10
   subscriber2: 10
      subscriber3: 10
subscriber1: 11
   subscriber2: 11
      subscriber3: 11

可以看到,多个订阅的 Subscriber 共享同一事件。
在这里,ConnectableObservable 是线程安全的。

2. 使用Subject/Processor

Subject 和 Processor 的作用是相同的。Processor 是 RxJava2.x 新增的类,继承自 Flowable 支持背压控制。而 Subject 则不支持背压控制。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());

        PublishSubject<Long> subject = PublishSubject.create();
        observable.subscribe(subject);

        subject.subscribe(subscriber1);
        subject.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        subject.subscribe(subscriber3);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

执行结果跟上面使用 publish 操作符是一样的。

Subject 既是 Observable 又是 Observer(Subscriber)。

当 Subject 作为 Subscriber 时,它可以订阅目标 Cold Observable 使对方开始发送事件。同时它又作为Observable 转发或者发送新的事件,让 Cold Observable 借助 Subject 转换为 Hot Observable。

注意,Subject 并不是线程安全的,如果想要其线程安全需要调用toSerialized()方法。(在RxJava1.x的时代还可以用 SerializedSubject 代替 Subject,但是在RxJava2.x以后SerializedSubject不再是一个public class)
然而,很多基于 EventBus 改造的 RxBus 并没有这么做,包括我以前也写过这样的 RxBus

推荐阅读