首页 > 解决方案 > Rxjava 单元素缓冲区

问题描述

我有一个案例,有一个对象在随机时间发布对象,我想按每秒将它收集到缓冲区中,并通过某种策略(例如最大分数)进行过滤,以确保每个缓冲区中只有一个对象-第二。

subject
    .buffer(1L, TimeUnit.SECONDS)
    .filter {
        isNotEmpty
    }
    .doOnNext {
        // I get all object in the one second
        // That waste too much memory, the non-max object shouldn't be put into the buffer
        _.asScala.max(byScore)
    }
    .ignoreElements
    .subscribeOn(Schedulers.io)
    .subscribe

此代码将在一秒钟内保存所有对象并返回给我。

那不是我想要的。

有什么解决办法吗?

标签: javakotlinrx-javarx-java2

解决方案


您可以使用以下版本的buffer运算符:

.buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count,
       Callable<U> bufferSupplier,
       boolean restartTimerOnMaxSize)

它允许您定义bufferSupplier用于存储缓冲值的自定义集合。然后,您可以创建您的集合的自定义版本,您最多存储一个项目,在我们的例子中,如果新的、更大的来替换现有值:

class SingleItemMaxCollection : ArrayList<Long>() {

    override fun add(element: Long): Boolean {
        return when {
            size == 1 && get(0) < element -> { super.set(0, element); true }
            size == 0 -> { super.add(element); true }
            else -> false
        }
    }
}

演示,如何在一些模拟数据上使用它(每 400 毫秒发出的项目):

class SO65020891 {

    private fun dataProvider() = Observable.just(1L, 2L, 3L, 4L, 5L, 6L)
        .concatMap { Observable.just(it).delay(400, TimeUnit.MILLISECONDS) }

    private fun getCollection(): () -> SingleItemMaxCollection = { SingleItemMaxCollection() }

    fun getBufferedMax(): Observable<Long> {
        return dataProvider()
            .buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 2, getCollection(), false)
            .filter { it.isNotEmpty() }
            .map { it[0] }
    }
}

最后,进行一些验证:

class SO65020891Test {

    @Test
    fun maxEmittedValuesReturnedWithinWindows() {
        val tested = SO65020891()

        val values = tested.getBufferedMax().blockingIterable().toList()

        assertEquals(listOf(2L, 4L, 6L), values)
    }
}

推荐阅读