java - Rxjava 单元素缓冲区
问题描述
我有一个案例,有一个对象在随机时间发布对象,我想按每秒将它收集到缓冲区中,并通过某种策略(例如最大分数)进行过滤,以确保每个缓冲区中只有一个对象-第二。
subject
.buffer(1L, TimeUnit.SECONDS)
.filter {
isNotEmpty
}
.doOnNext {
// I get all object in the one second
// That waste too much memory, the non-max object shouldn't be put into the buffer
_.asScala.max(byScore)
}
.ignoreElements
.subscribeOn(Schedulers.io)
.subscribe
此代码将在一秒钟内保存所有对象并返回给我。
那不是我想要的。
有什么解决办法吗?
解决方案
您可以使用以下版本的buffer
运算符:
.buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count,
Callable<U> bufferSupplier,
boolean restartTimerOnMaxSize)
它允许您定义bufferSupplier
用于存储缓冲值的自定义集合。然后,您可以创建您的集合的自定义版本,您最多存储一个项目,在我们的例子中,如果新的、更大的来替换现有值:
class SingleItemMaxCollection : ArrayList<Long>() {
override fun add(element: Long): Boolean {
return when {
size == 1 && get(0) < element -> { super.set(0, element); true }
size == 0 -> { super.add(element); true }
else -> false
}
}
}
演示,如何在一些模拟数据上使用它(每 400 毫秒发出的项目):
class SO65020891 {
private fun dataProvider() = Observable.just(1L, 2L, 3L, 4L, 5L, 6L)
.concatMap { Observable.just(it).delay(400, TimeUnit.MILLISECONDS) }
private fun getCollection(): () -> SingleItemMaxCollection = { SingleItemMaxCollection() }
fun getBufferedMax(): Observable<Long> {
return dataProvider()
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 2, getCollection(), false)
.filter { it.isNotEmpty() }
.map { it[0] }
}
}
最后,进行一些验证:
class SO65020891Test {
@Test
fun maxEmittedValuesReturnedWithinWindows() {
val tested = SO65020891()
val values = tested.getBufferedMax().blockingIterable().toList()
assertEquals(listOf(2L, 4L, 6L), values)
}
}
推荐阅读
- akka - 在akka集群框架中使用Hazelcast MapListner
- sql - 在单个查询上填充空记录
- php - 从括号之间的文本中删除换行符
- node.js - 通过在 Docker 容器中缓存包来加速构建
- android - 让 Google 助理在特定状态下打开我的应用
- c# - 有没有办法保护所有现有行在 SQL 表中不被编辑?
- c - 如何让这个文件成为每行的第一个数字来增加它?那么第一行没有被跳过呢?在 C 中
- facebook - 发送到 Messenger 按钮未显示
- java - 使用 Jsoup 1.11 解析 XHTML
- laravel - 调用未定义的函数 App\Http\Controllers\appends()