首页 > 解决方案 > 如何在 android rxJava 2.0 中订阅 flowable

问题描述

我是 android 中 rxJava 的新手。我已经成功创建了 observable。现在我想创建可流动的。我不太清楚 flowable 是否以同样的方式工作。以下是我的 observable 代码,它工作正常。

 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(final ObservableEmitter<String> e) throws Exception {
        //Use onNext to emit each item in the stream//
        t = new Timer();                                                                 
        t.scheduleAtFixedRate(
            new TimerTask() {
              public void run() {
                if (listObj.size() > 0) {
                  varToBeAddedInObservable = listObj.remove(listObj.size() - 1);
                  e.onNext(varToBeAddedInObservable);
                } else {
                  e.onComplete();
                  t.cancel();
                }
              }
            },
            0,      // run first occurrence immediatetly
            1000);
    }
  }
    );

/////////

 observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            //.toFlowable(BackpressureStrategy.BUFFER)
            .subscribe(myObserver);

//////////////

     private Observer<String> getMyObserver() {
        return new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "Name: " + s);

                int colorIndex = random.nextInt(arrayColors.length);
                //Color color = (arrayColors[colorIndex]);
                textView.setTextColor(arrayColors[colorIndex]);
                textView.setText(varToBeAddedInObservable);
                AlphaAnimation fadeIn = new AlphaAnimation(0.0f, 1.0f);
//                AlphaAnimation fadeOut = new AlphaAnimation( 1.0f , 0.0f ) ;
                textView.startAnimation(fadeIn);
                // textView.startAnimation(fadeOut);
                fadeIn.setDuration(1500);
                fadeIn.setFillAfter(true);
             /*   fadeOut.setDuration(1200);
                fadeOut.setFillAfter(true);
                fadeOut.setStartOffset(1200 - fadeIn.getStartOffset());
                fadeOut.setAnimationListener(new Animation.AnimationListener() {
                    @Override
                    public void onAnimationStart(Animation animation) {

                    }

                    @Override
                    public void onAnimationEnd(Animation animation) {
                        textView.setTextColor(Color.BLACK);
                        textView.setText("Khallas !!!!");
                    }

                    @Override
                    public void onAnimationRepeat(Animation animation) {

                    }
                });*/

            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "All items are emitted!");
                textView.setTextColor(Color.BLACK);
                textView.setText("Finished !!!!");
            }

        };

上面的代码工作正常,现在我正在尝试对 flowable 做同样的事情,但它不允许我订阅,那么该怎么做呢?我读到需要有类似一次性的东西来订阅可流动的,但我没有得到合适的例子来做到这一点。谢谢 :)

 private Flowable<String> getMyflowable() {
    return new Flowable<String>() {
        @Override
        protected void subscribeActual(Subscriber<? super String> s) {

        }


    };
}

标签: androidrx-java2

解决方案


推荐阅读