首页 > 解决方案 > kotlin 协程,定期发出数据并检查订阅者数量

问题描述

我在 Spring Boot 上有带有 rSocket 服务的服务器:

@MessageMapping("get-messages")
fun getMessageById(): Flow<Set<Message>> {
    return flow { emit(service.getLatestMessages()) }
}

因为回购不是反应性的,我想定期去数据库获取数据并将其提供给订阅者(如果存在)。

我想像这样使用 StateFlow:

private val stateFlowMessages = MutableStateFlow<Set<Message>>(emptySet())
init {
    CoroutineScope(Dispatchers.IO).launch {
        while(true){
            if (stateFlowProducts.subscriptionCount.value > 0) 
                stateFlowProducts.value = service.getLatestMessages()
            delay(6 * 1000)
        }
    }
}

但是订阅者总是0,我认为“延迟”的“同时”不是最佳做法?

标签: kotlin-coroutinesflow

解决方案


0. subscriptionCount: `0       1      2     0     2`
1. Map to true/false: `false  true  true false true`
2. Distinct.        : `false  true       false true`
3. Filter.          : `       true             true`
3. MapLatest.       : `         list             list`.
stateFlowProducts.subscriptionCount
 .map { it > 0 }
 .distinctUntilChanged()
 .filter { it }
 .mapLatest { service.getLatestMessages() }
 .onEach { stateFlowProducts.value = it }
 .launchIn(scope)

https://pl.kotl.in/nQBp0zW6s


推荐阅读