首页 > 解决方案 > 如何使用 Rx Java 实现此要求

问题描述

我有一个包含(良好、非关键、关键)值的状态(枚举)

所以要求是:

  1. 当状态进入非关键状态时应该触发。
  2. 当状态进入临界状态时应该触发。
  3. 应在状态保持临界状态 15 秒时触发。

输入 :

publishSubject.onNext("Good")
publishSubject.onNext("Critcal") 
publishSubject.onNext("Critcal") 
publishSubject.onNext("NonCritical")  
publishSubject.onNext("Critacal") 
publishSubject.onNext("Critical") 
publishSubject.onNext("Good")
and so on...

请参阅代码结构以供参考:

    var publishSubject = PublishSubject.create<State>()
    publishSubject.onNext(stateObject)


    publishSubject
            /* Business Logic Required Here ?? */
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                AppLogger.printLog("Trigger Success --> ")
            }

请帮助,在此先感谢,

标签: androidrx-javarx-java2rx-androidrx-kotlin

解决方案


您可以使用distinctUntilChanged()来抑制不更改状态的事件。使用 过滤掉正常事件filter()

当状态改变时,使用switchMap()操作符创建一个新的订阅。当状态为“临界”时,使用interval()运算符等待 15 秒。如果状态在那 15 秒内发生变化,switchMap()将取消订阅并重新订阅一个新的 observable。

publishSubject
  .distinctUntilChanged()
  .subscribeOn(Schedulers.computation())
  .observeOn(AndroidSchedulers.mainThread())
  .filter( state -> state != State.Normal )
  .switchMap( state -> {
                   if (state == State.Critical) {
                     return Observable.interval(0, 15, TimeUnit.SECONDS) // Note 1
                        .map(v -> State.Critical); // Note 2
                   }
                   return Observable.just( State.Noncritical );
                 })
  .subscribe( ... );
  1. interval()被赋予初始值0,使其立即发出一个值。几秒钟后15,将发出下一个值,依此类推。
  2. map()操作员将发出Long的 byinterval()变成

推荐阅读