首页 > 技术文章 > 零基础快速搭建rxjava框架

lanlengran 2018-08-29 09:21 原文

基本概念

定义

RxJava 是一个 基于事件流、实现异步操作的库

原理

角色 作用 类比
被观察者(Observable) 产生事件 顾客
观察者(Observer) 接收事件,并给出响应动作 厨房
订阅(Subscribe) 连接 被观察者 & 观察者 服务员
事件(Event) 被观察者 & 观察者 沟通的载体 菜式

即RxJava原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & 作出对应的响应动作。具体如下图:
image
(图片和表格来自Carson_Ho博客)

集成

在build中添加

    implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.0'

最简单的形式

    /**
     * 最基础的订阅
     */
    private void test1() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("qin","开始采用subscribe连接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d("qin","对Next事件"+integer+"做出响应");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("qin","对error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d("qin","对onComplete事件做出响应");
            }
        });
    }
  1. 在这个函数中首先通过Observable.create创建了一个被观察者,然后被观察者发送了四个event事件,包含三个onNext和一个onComplete事件。
  2. 之后通过new Observer()创建了一个被观察者,其中分别对四种事件进行接收。
  3. 最后通过subscribe将二者连接到了一起。

常用基本操作符

fromArray和fromIterable

作用

直接发送数组(fromArray)和列表(fromIterable)

使用场景

可以实现数组或者列表的便利,可以替代for循环

实例代码:
 Observable.fromArray(new Integer[]{0,1,2,3,4}).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("qin","开始采用subscribe连接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d("qin","对Next事件"+integer+"做出响应");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("qin","对error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d("qin","对onComplete事件做出响应");
            }
        });
        
        
        ArrayList<Integer> list=new ArrayList();
        for (int i=0;i<10;i++){
            list.add(i);
        }
        Observable.fromIterable(list).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("qin","开始采用subscribe连接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d("qin","对Next事件"+integer+"做出响应");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("qin","对error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d("qin","对onComplete事件做出响应");
            }
        });

interval

作用

定时发送事件,类型timer

使用场景

可以替代timer等定时任务,例如每隔一段时间去重新拉取数据

实例代码:
  /**
          参数1 = 第1次延迟时间;
          参数2 = 间隔时间数字;
          参数3 = 时间单位;
         */
        Observable.interval(3,1, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("qin","开始采用subscribe连接");
            }

            @Override
            public void onNext(Long integer) {
                Log.d("qin","对Next事件"+integer+"做出响应");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("qin","对error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d("qin","对onComplete事件做出响应");
            }
        });

map和flatmap

作用

将获取到的事件转换成新的事件(map)或者转换成新的观察者(flatmap)

使用场景
  1. 数据类型转换,例如将罗马数字1234转换成中文数字一二三四。或者从json中取出特定的数据。
  2. 可以将两个观察者串在一起。例如我在第一个接口中获取了用户信息,但是只包含userid用户姓名需要第二个接口去根据userid去查。就可以使用flatmap去实现串联操作
  3. 可以结合上面的interval操作符,实现定时拉取数据
实例代码:
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "使用 Map变换操作符 将事件" + integer + "的参数从 整型" + integer + " 变换成 字符串类型" + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("qin", s);
            }
        });
        
        
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("我是事件" + integer + "拆分后的子事件" + i);
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("qin", s);
            }
        });

zip

作用

合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送

使用场景
  1. 两个接口没有关联关系,但是都需要调用。那么就可以使用zip来进行并发操作。例如我拥有用户的id,需要查询用户的姓名(第一个接口)和用户的家庭住址(第二个接口)。按照原来的程序逻辑我们需要先查询用户姓名,成功以后再查询用户地址,最后显示。如果使用zip,我们就可以同时创建两个被观察者一个获取姓名一个获取地址,并发操作,最后再zip中进行合并,在观察者中显示。
实例代码:
  Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "被观察者发送了事件1");
                emitter.onNext(1);
                Thread.sleep(1000);

                Log.d(TAG, "被观察者发送了事件2");
                emitter.onNext(2);
                Thread.sleep(1000);

                Log.d(TAG, "被观察者发送了事件3");
                emitter.onNext(3);
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "被观察者2发送了事件1");
                emitter.onNext("A");
                Thread.sleep(1000);

                Log.d(TAG, "被观察者2发送了事件2");
                emitter.onNext("B");
                Thread.sleep(1000);

                Log.d(TAG, "被观察者2发送了事件3");
                emitter.onNext("C");
                Thread.sleep(1000);

                Log.d(TAG, "被观察者2发送了事件4");
                emitter.onNext("D");
                Thread.sleep(1000);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {

                return integer + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "最终接收到的事件 =  " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

subscribeOn和observeOn

作用

subscribeOn :指定Observable自身在哪个调度器上执行,也就是指定被观察者运行的线程

observeOn :指定一个观察者在哪个调度器上观察这个Observable,也就是指定观察者运行的线程

使用场景
  1. 在程序中,我们经常需要在非主线程进行网络操作,在主线程更新ui,就可以使用这两个操作符
实例代码:
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("qin", "运行的线程是" + Thread.currentThread().getName());
                Thread.sleep(2000);
                emitter.onNext(1);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d("qin", "运行的线程是" + Thread.currentThread().getName());
                        Log.d(TAG, "onNext: " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    // 接收合并事件后,统一展示
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "获取数据完成");
                        Log.d(TAG, result);
                    }
                });

compose

作用

可以将一种类型的Observable转换成另一种类型的Observable

使用场景
  1. 可以将一组操作符重用于多个数据流中。例如,因为希望在工作线程中处理数据,然后在主线程中处理结果,所以我会频繁使用subscribeOn()和observeOn(),但是这是非常繁琐的,我们就可以使用compose来处理
实例代码:
//我们可以把上个例子的代码变为
     Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("qin", "运行的线程是" + Thread.currentThread().getName());
                Thread.sleep(2000);
                emitter.onNext(1);
                emitter.onComplete();
            }
        }).compose(rxSchedulerHelper())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Object value) {
                        Log.d("qin", "运行的线程是" + Thread.currentThread().getName());
                        Log.d(TAG, "onNext: " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    // 接收合并事件后,统一展示
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "获取数据完成");
                        Log.d(TAG, result);
                    }
                });
                
    /**
     * 统一线程处理
     * @param <T> 指定的泛型类型
     * @return ObservableTransformer
     */
    public static <T> ObservableTransformer<T, T> rxSchedulerHelper() {

        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

retrofit和rxjava的结合

依赖包

如果让retrofit支持rxjava的调用需要依赖下面两个包

// 衔接 Retrofit & RxJava
// 此处一定要注意使用RxJava2的版本
    compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

// 支持Gson解析
    compile 'com.squareup.retrofit2:converter-gson:2.1.0'

最基本网络访问

首先是新建retrofit,这里封装了一个方法。

    /**
     * 新建需要进行网络访问retrofit
     */
    public <T> T getRx(Class<T> service) {
       String baseUrl =  HttpURLConstant.getURL(0);
        LogUtil.i("provideRetrofit()  retrofit will new Retrofit.Builder() baseUrl:" + baseUrl);
        retrofit = new Retrofit.Builder() //每次请求都要求新建
                .baseUrl(baseUrl)
                .addConverterFactory(ScalarsConverterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .client(AppModule.provideOkHttpClient())
                .build();
        return retrofit.create(service);
    }

可以看到与不使用rxjava的retrofit相比多了一句" .addCallAdapterFactory(RxJava2CallAdapterFactory.create())" 这句是必须的,否则rxjava会创建报错

接下来我们需要编写retrofit的访问方法

//不使用rxjava的时候retrofit的写法
        @POST(HttpURLConstant.URL_CHECK_REGISTED)
        Call<String> checkRegisted(@Body Map<String,Object> jsonString);
//使用rxjava的时候retrofit的写法
        @POST(HttpURLConstant.URL_CHECK_REGISTED)
        Observable<String> checkObservableRegisted(@Body Map<String,Object> jsonString);

很明显就可以发现,二者最大的不同就是返回的类型由call变成了Observable。这样联系上面的知识,我们就可以知道,我们获取了retrofit的observable之后,就可以去订阅,获取访问结果.

所以,最后我们获取到retrofit的Observable的完整的方法是

    /**
     * 校验是否已注册
     *
     * @param phoneNum
     * @param
     * @return
     */
    public Observable<String> checkRegistedObservableString(String phoneNum) {
        Map<String, Object> obj = new HashMap<>();
        if (!TextUtils.isEmpty(phoneNum)) {
            obj.put("telephone", phoneNum);
        }

        Observable observable = getRx(HttpTalkForRetrofit.UserApi.class).checkRegistedObservableString(obj);

        return observable;
    }

然后我们在调用的地方这样写

 new HttpManagerUser().checkRegistedObservableString(phone)
 .compose(RxUtils.rxSchedulerHelper())
 .subscribe(new ResourceObserver<String>() {
            @Override
            public void onNext(String value) {
                Log.d("qin","value"+value);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {

            }
        });

这样我们就可以在onNext中获取访问的结果,如果访问出错那么就会onError的回调。这样我们就完成了一个最基本的网络访问的请求。

对网络返回结果进行过滤

我们在网络请求的时候经常会出现,服务端返回的数据虽然走的是正确的接口,但是里面的数据是没有的。所以我们希望滤掉这样的信息,只保留获取成功的数据,在我们的项目中也就是state=0的数据。

如果要实现这样的功能,那么我们就需要和服务端约定一个基类。例如在我们现在的工程中基类如下

public class HttpResponseBase<T> {
    private int status;
    private String description;
    private int total;
    private T data;
        
    /**
    这里是get,set方法,就不列出来了
    **/
}

然后在例子中,data的类是

public class CheckUserExistsBean {

    /**
     * registered : true
     */

    private boolean registered;

    public boolean isRegistered() {
        return registered;
    }

    public void setRegistered(boolean registered) {
        this.registered = registered;
    }
}

接下来,我们就可以改造之前的获取retrofit的Observable的方法。由string改成具体的bean。

    /**
     * 校验是否已注册
     *
     * @param phoneNum
     * @param
     * @return
     */
    public Observable<HttpResponseBase<CheckUserExistsBean>> checkObservableRegisted(String phoneNum) {
        Map<String, Object> obj = new HashMap<>();
        if (!TextUtils.isEmpty(phoneNum)) {
            obj.put("telephone", phoneNum);
        }

        Observable observable = getRx(HttpTalkForRetrofit.UserApi.class).checkObservableRegisted(obj);

        return observable;
    }
    
        @POST(HttpURLConstant.URL_CHECK_REGISTED)
        Observable<HttpResponseBase<CheckUserExistsBean>> checkObservableRegisted(@Body Map<String,Object> jsonString);

这样,只要接口返回的数据正确,我们就可以直接获取到对应bean。而不是string。接下来,我们只需要在最后的观察者收到数据直接拦截state不为0的数据即可,这里我们就可以使用前面讲过flatmap方法。所以调用接口方法可以改成这样

  new HttpManagerUser().checkObservableRegisted(phone).compose(RxUtils.rxSchedulerHelper())
                .flatMap(new Function<HttpResponseBase<CheckUserExistsBean>, ObservableSource<CheckUserExistsBean>>() {
                    @Override
                    public ObservableSource<CheckUserExistsBean> apply(HttpResponseBase<CheckUserExistsBean> httpResponseBase) throws Exception {
                        if(httpResponseBase.getStatus() == 0) {
                            return  Observable.create(new ObservableOnSubscribe<CheckUserExistsBean>() {
                                @Override
                                public void subscribe(ObservableEmitter<CheckUserExistsBean> emitter) throws Exception {
                                    try {
                                        emitter.onNext(httpResponseBase.getData());
                                        emitter.onComplete();
                                    } catch (Exception e) {
                                        emitter.onError(e);
                                    }
                                }
                            });

                        } else {
                            return Observable.error(new ServerException(httpResponseBase.getStatus(),httpResponseBase.getDescription()));
                        }
                    }
                }).subscribe(new ResourceObserver<CheckUserExistsBean>() {
            @Override
            public void onNext(CheckUserExistsBean value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

但是这样功能虽然实现了,很明显代码很臃肿,而且不能复用,每个接口都需要写这么一大坨。所以这时候compose就派上用处了。我们可以把代码拆分成这样

       new HttpManagerUser().checkObservableRegisted(phone).compose(RxUtils.rxSchedulerHelper())
                .compose(RxUtils.handleResult())
                .subscribe(new ResourceObserver<CheckUserExistsBean>() {
            @Override
            public void onNext(CheckUserExistsBean value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
        
        //RxUtils中
        
            /**
     * 统一返回结果处理
     * @param <T> 指定的泛型类型
     * @return ObservableTransformer
     */
    public static <T> ObservableTransformer<HttpResponseBase<T>, T> handleResult() {
        ObservableTransformer observableTransformer= new ObservableTransformer<HttpResponseBase<T>, T>() {
            @Override
            public Observable apply(Observable<HttpResponseBase<T>> httpResponseObservable) {

               return httpResponseObservable.flatMap(new Function<HttpResponseBase<T>, Observable<T>>() {
                    @Override
                    public Observable<T> apply(HttpResponseBase<T> httpResponseBase) throws Exception {
                        if(httpResponseBase.getStatus() == 0) {
                            return createData(httpResponseBase.getData());
                        } else {
                            return Observable.error(new ServerException(httpResponseBase.getStatus(),httpResponseBase.getDescription()));
                        }
                    }
                });
            }
        };
        return observableTransformer;
    }
    
        /**
     * 得到 Observable
     * @param <T> 指定的泛型类型
     * @return Observable
     */
    private static <T> Observable<T> createData(final T t) {
        return Observable.create(emitter -> {
            try {
                emitter.onNext(t);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        });
    }

这样我们就成功的筛选出了stata为0的数据,并且无论哪个接口只需要添加一句

.compose(RxUtils.handleResult())

就可以实现数据的过滤

对网络访问错误进行处理

同样的,如果网络访问出错,例如网络错误,服务器返回了错误的数据,而且在上面我们把stata不为0的数据也抛到错误中了,如果我们分别在每个错误的回调中处理,这样的处理方法不仅低效,而且臃肿了。所以和上面一样,我们也使用compose来进行统一的处理。把网络访问的错误信息直接转换成用户可以理解的。
在RxUtils中添加错误的处理方法

    /**
     * 统一返回结果处理
     * @param <T> 指定的泛型类型
     * @return ObservableTransformer
     */
    public static <T> ObservableTransformer<T,T> handleError() {
        ObservableTransformer observableTransformer= new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                return upstream.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends T>>() {
                    @Override
                    public ObservableSource<? extends T> apply(Throwable throwable) throws Exception {
                        return Observable.error(ExceptionEngine.handleException(throwable));
                    }
                });
            }
        };
        return observableTransformer;
    }

新建一个错误处理类ExceptionEngine(这里仅仅用作例子,需要根据实际情况更改)

public class ExceptionEngine {
    public static final int UNKNOWN = 1000;
    public static ApiException handleException(Throwable e){
        ApiException ex;
        if (e instanceof HttpException){             //HTTP错误

            org.xutils.ex.HttpException httpException = (org.xutils.ex.HttpException) e;
            ex = new ApiException(e, httpException.getCode());
            ex.message=ErrorManager.getToastErrorMsg(httpException.getCode());
            return ex;
        } else if (e instanceof ServerException){    //服务器返回的错误
            ServerException resultException = (ServerException) e;
            ex = new ApiException(resultException, resultException.code);
            ex.message = ErrorManager.getToastErrorMsg(resultException.code);
            return ex;
        } else if (e instanceof JsonParseException
                || e instanceof JSONException
                || e instanceof ParseException){
            ex = new ApiException(e, HttpListener.ERROR_EXCEPTION);
//            ex.message = "解析错误";            //均视为解析错误
            ex.message = ErrorManager.getToastErrorMsg(HttpListener.ERROR_EXCEPTION);
            return ex;
        }else if(e instanceof ConnectException){
            ex = new ApiException(e, HttpListener.ERROR_GATEWAY_TIMEOUT);
//            ex.message = "连接失败";  //均视为网络错误
            ex.message = ErrorManager.getToastErrorMsg(HttpListener.ERROR_GATEWAY_TIMEOUT);
            return ex;
        }else {
            ex = new ApiException(e, UNKNOWN);
//            ex.message = "未知错误";          //未知错误
            ex.message = ErrorManager.getToastErrorMsg(UNKNOWN);
            return ex;
        }
    }
}

同时新建一种错误类型ApiException

public class ApiException extends Exception {
    public int code;
    public String message;

    public ApiException(Throwable throwable, int code) {
        super(throwable);
        this.code = code;

    }
}

好了现在万事具备,我们只需要在我们的获取数据方法中使用这个方法就行了

                   new HttpManagerUser().checkObservableRegisted(phone).compose(RxUtils.rxSchedulerHelper())
                            .compose(RxUtils.handleResult())
                            .compose(RxUtils.handleError())
                            .subscribeWith(new ResourceObserver<CheckUserExistsBean>() {
                                @Override
                                public void onNext(CheckUserExistsBean value) {

                                    if (value.isRegistered()) {
                                        //如果是已经注册的手机号
                                        view.toVelidateActivity();
                                    } else {
                                        //如果是新用户
                                        view.toRegisterctivity();
                                    }
                                }

                                @Override
                                public void onError(Throwable e) {
                                    showErrorMsg(e);

                                }

                                @Override
                                public void onComplete() {
                                    view.hindDialog();
                                }
                            })

这样我们就基本上完成了比较简单和完善的rxjava访问的框架了

推荐阅读