首页 > 解决方案 > RXKotlin 中断 doOnNext 并调用另一个函数

问题描述

我正在使用 rx kotlin,但还没有完全理解。我正在尝试遍历查询列表,并一一执行它们。在这个列表中,我有一个特殊的字符串,一旦达到,我想打破循环并执行另一个功能

我怎么能在下面的例子中做到这一点?

fun runQueries() {

     Observable.fromIterable(queriesTemp)

                    .subscribeOn(Schedulers.computation())
                    .doOnNext { query ->
                        if (query.contains("COMPLETION OF SDF QUERIES")) {
                            if (loginStatus == StaticVariables.FT_CASE_NEW_LOGIN) {
                                tasksQueriesTemp = arrayOfNulls(queries.size - queries.indexOf(query))
                                System.arraycopy(queries, queries.indexOf(query), tasksQueriesTemp, 0, tasksQueriesTemp!!.size)
                            }
                          // break the loop here 
                            runOtherQueries()
                             break
                        }
                        if (!TextUtils.isEmpty(query)) {
                            mDatabase.execSQL(query, false, "")
                        }
                        action(tasksQueriesTemp!!.indexOf(query))
                    }
                    .doOnComplete { executeOtherUpdates(tasksQueriesTemp) }
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe()
}

fun runOtherQueries() {
}

标签: androidrx-javarx-kotlin

解决方案


doOnNext从使用中分解出您想要打破的部分takeWhile

val broken = AtomicBoolean();
Observable.fromIterable(queriesTemp)
    .subscribeOn(Schedulers.computation())
    .takeWhile { query ->
        if (query.contains("COMPLETION OF SDF QUERIES")) {
            if (loginStatus == StaticVariables.FT_CASE_NEW_LOGIN) {
                tasksQueriesTemp = arrayOfNulls(queries.size -
                    queries.indexOf(query))
                System.arraycopy(queries, queries.indexOf(query), 
                    tasksQueriesTemp, 0, tasksQueriesTemp!!.size)
            }
            // break the loop here 
            runOtherQueries()
            broken.set(true)
            return@takeWhile false  // whatever the Kotlin syntax is for local returns
        }
        return@takeWhile true
    }
    .doOnNext { query ->

        if (!TextUtils.isEmpty(query)) {
            mDatabase.execSQL(query, false, "")
        }
        action(tasksQueriesTemp!!.indexOf(query))
    }
    .doOnComplete { 
         // if you don't want to execute the other updates if the code
         // in takeWhile has "broken out of the loop"
         if (!broken.get())
             executeOtherUpdates(tasksQueriesTemp) 
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe()

推荐阅读