首页 > 解决方案 > Kotlin 协程流中 RxJava .toList() 的等价物

问题描述

我有一种情况,我需要观察用户 ID,然后使用这些用户 ID 来观察用户。userIds 或 users 可以随时更改,我希望让发出的用户保持最新。这是我拥有的数据来源的示例:


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

在这种情况下,我希望排放量为:

[User(abc_name), User(def_name)], 然后

[User(123_name), User(234_name)], 然后

[User(123_name_updated), User(234_name_updated)]

我想我可以像这样在 RxJava 中实现这一点:

observeBestUserIds.concatMapSingle { ids ->
    Observable.fromIterable(ids)
        .concatMap { id ->
            observeUserForId(id)
        }
        .toList()
}

我会写什么函数来制作一个发出它的流?

标签: kotlinrx-javakotlin-coroutineskotlin-flow

解决方案


我相信您正在寻找combine,它为您提供了一个可以轻松调用的数组toList()

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.collect {
        println(it)
    } 
}

这是具有更明确参数名称的内部部分,因为您在 Stack Overflow 上看不到 IDE 的类型提示:

combine(
    ids.map { id -> observeUserForId(id) }
) { arrayOfUsers: Array<User> ->
    arrayOfUsers.toList()
}.collect { listOfUsers: List<User> ->
    println(listOfUsers)
}

输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

现场演示(请注意,在演示中,所有输出都会同时出现,但这是演示站点的限制 - 这些行的出现时间与您在本地运行代码时所期望的时间一致)

这避免了原始问题中讨论的 ( abc_name_updated, )。def_name_updated但是,仍然有一个中间发射,123_name_updated因为234_name123_name_updated首先发射,并且它立即发送组合版本,因为它们是每个流中的最新版本。

但是,可以通过消除发射来避免这种情况(在我的机器上,小到 1 毫秒的超时有效,但为了保守起见,我做了 20 毫秒):

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.debounce(timeoutMillis = 20).collect {
        println(it)
    }
}

它可以为您提供所需的确切输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

现场演示


推荐阅读