首页 > 解决方案 > 如何使用 RxJava 对来自异步源的数据进行分组

问题描述

我正在开发一个交易应用程序。当用户选择一些产品时,我需要为每个产品显示市场是否开放及其最新价格。用户可以选择例如 2 个产品,我要做的是仅在我拥有 2 个产品的所有信息时才显示数据。数据可以随时更改(即其中一种产品的市场关闭)。这是我的代码:

data class ProductInfo(
    val productCode: String,
    val isMarketOpen: Boolean,
    val latestPrice: BigDecimal,
)

// This observable could emit at any time due to user interaction with the UI
private fun productsCodeObservable(): Observable<List<String>> = Observable.just(listOf("ProductA", "ProductB"))

// Markets have different working hours
private fun isMarketOpenObservable(productCode: String): Observable<Boolean> {
    return Observable.interval(2, TimeUnit.SECONDS)
            .map {
                // TODO: Use API to determine if the market is open for productCode
                it.toInt() % 2 == 0
            }
}

// The product price fluctuates so this emits every X seconds
private fun latestPriceObservable(productCode: String): Observable<BigDecimal> {
    return Observable.interval(2, TimeUnit.SECONDS)
            .map { productPrice -> productPrice.toBigDecimal() }
}

@Test
fun `test`() {

    val countDownLatch = CountDownLatch(1)

    productsCodeObservable()
            .switchMap { Observable.fromIterable(it) }
            .flatMap { productCode ->
                Observable.combineLatest(
                        isMarketOpenObservable(productCode),
                        latestPriceObservable(productCode)
                ) { isMarketOpen, latestPrice ->
                    ProductInfo(productCode, isMarketOpen, latestPrice)
                }
            }
            .toList()
            .doOnSuccess { productsInfo ->
                println(productsInfo)
            }
            .subscribe()

    countDownLatch.await()
}

我不知道问题是什么,因为测试方法从不打印任何东西。我对 RxJava 了解不多,但我的理解是 toList 不起作用,因为源 observables 永远不会完成。关于如何收集产品代码的数据并在任何数据更改时发出列表的任何想法?:)

标签: javaandroidkotlinrx-javareactive-programming

解决方案


如果您想在每次这些产品发生变化时收到新的产品信息列表:

productsCodeObservable()
    .switchMap { list ->
        val productInfoObservables = list.map { productCode ->
            Observable.combineLatest(
                isMarketOpenObservable(productCode),
                latestPriceObservable(productCode)
            ) { isMarketOpen, latestPrice ->
                ProductInfo(productCode, isMarketOpen, latestPrice)
            }
        }
        Observable.combineLatest(productInfoObservables) { it.toList() as List<ProductInfo> }
    }
    .doOnNext { productsInfoList ->
        println(productsInfoList)
    }
    .subscribe()

推荐阅读