首页 > 解决方案 > 组合多个 rx 源

问题描述

让我们想象一下以下情况,我们有一个从服务器端返回 Observable 对象源的函数:

private fun getStatistics(): Observable<TestStatistics> {
        return Observable
                .fromIterable(listOf(
                        TestStatistics(1.1, 1.2, 4),
                        TestStatistics(2.1, 2.2, 1),
                        TestStatistics(3.1, 3.2, 99)
                ))
                .delay(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
    }

TestStatistics 实体:

data class TestStatistics(val doubleCashBack: Double, val doubleAmount: Double, val currencyId: Int)

正如您在服务器响应中看到的,我们有 currencyId 将我们指向 Currency 实体:

data class TestCurrency(val currencyId: Int, val currencySign: String)

我们还有另一个函数,通过 id 从数据库返回货币实体的单一来源:

private fun getCurrencyById(id: Int): Single<TestCurrency> {
        return when (id) {
            1 -> Single.just(TestCurrency(1, "!"))
            2 -> Single.just(TestCurrency(2, "@"))
            3 -> Single.just(TestCurrency(3, "#"))
            else -> Single.error(Exception("Currency not found"))
        }
                .delay(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
    }

主要思想是获取每个发出的 Statistic 实体,将其属性组合起来,然后发出具有属性和 Currency 的组合实体作为对象,所以问题来了,在这种情况下,我们必须先将 currencyId 取为从数据库中成功接收到的 Currency 对象而不是发出结果实体,所以结果类将如下所示:

data class TestDashboardStatistics(val count: Int, val cashBack: Double, val amount: Double, val testCurrency: TestCurrency)

但是我对这种可观察源的组合存在一些问题,服务器请求在一个线程中运行,数据库在另一个线程中运行,并且在第三个中组合代码,所以我必须确保我将处理从服务器接收到的所有统计信息,将忽略从数据库返回的所有错误(只有当我最终找到 Currency 时,如果所有请求都失败,我必须返回默认值),并且只会向数据库发出一个成功的请求,将把这个对象放在结果实体中,然后将它返回 Combing 函数可能如下所示:

private fun getCombinedStatistics(): Single<TestDashboardStatistics> {
        return Single.create<TestDashboardStatistics> {
            var transactionsAmount = 0.0
            var cashBackAmount = 0.0
            var count = 0
            var currency = TestCurrency(-1, "default")

            getStatistics().subscribe({ statistic ->
                ++count
                transactionsAmount += statistic.doubleAmount
                cashBackAmount += statistic.doubleCashBack
                getCurrencyById(statistic.currencyId).subscribe({ cur ->
                    // TODO do not request currency for future statistics because we have it now but
                    // TODO because different threads we can subscribe for new request before we will receive this result
                    currency = cur
                }, { err ->
                    // TODO ignore error if there is a hope that other statistics will have valid currency code
                })
            }, {
                // On requesting statistics error just throw it up
                Single.error<TestDashboardStatistics>(it)
            }, {
                // When all statistics will be received and precessed emit result
                // But it could be called even before we will receive any response from database
                Single.just(TestDashboardStatistics(count, cashBackAmount, transactionsAmount, currency))
            })
        }
    }

我想到的一个解决方案是从数据库请求货币以某种方式阻塞处理统计信息,因此处理将等到数据库请求完成,然后再进行另一个,但是我对 Rx 运算符的了解非常糟糕,所以我不知道知道我该怎么做。

标签: javamultithreadingkotlinrx-javareactive-programming

解决方案


我建议按照您的建议保留 db 请求阻塞:

data class TestStatistics(val doubleCashBack: Double, val doubleAmount: Double, val currencyId: Int)
data class TestCurrency(val currencyId: Int, val currencySign: String)
data class TestDashboardStatistics(val count: Int?, val cashBack: Double, val amount: Double, val testCurrency: TestCurrency)

object Helloworld {
    private fun getStatistics(): Observable<TestStatistics> {
        return Observable
            .fromIterable(listOf(
                TestStatistics(1.1, 1.2, 4),
                TestStatistics(2.1, 2.2, 1),
                TestStatistics(3.1, 3.2, 99),
                TestStatistics(4.1, 4.3, 2),
                TestStatistics(5.1, 5.3, 3)
            ))
            .delay(2, TimeUnit.SECONDS)
    }

    private fun getCurrencyById(id: Int): TestCurrency? {
        // blocking call
        return when (id) {
            1 -> TestCurrency(1, "!")
            2 -> TestCurrency(2, "@")
            3 -> TestCurrency(3, "#")
            else -> null
        }
    }

    @JvmStatic
    fun main(args: Array<String>) {
        getStatistics()
            .map { getCurrencyById(it.currencyId) to it }
            .filter { it.first != null }
            .map { TestDashboardStatistics(null, it.second.doubleCashBack, it.second.doubleAmount, it.first!!) }
            .subscribe { println(it) }

        Thread.sleep(5000)
    }
}

我使该count字段可以为空,因为我不完全理解您要实现的目标。

我还建议您省略subscribeOn辅助方法中的调用并将它们放入main方法中(连同observeOn()函数),在其中链接您的业务逻辑。通过这种方式,您可以在不同操作之间切换线程(例如,在线程上订阅、在ui线程上进行数据库调用、在io线程上执行繁重的算法computation等)

希望这可以帮助 :)

PS据我了解您的用例,您只需要一个简单的map操作:TestStatistics-> TestDashboardStatistics。如果您不想TestCurrency每次都访问数据库,您可以缓存已经获取的实例(使用Map??)。


推荐阅读