首页 > 解决方案 > 我可以捆绑在 Rxjava2 中的功能时间间隔内发布的项目吗?

问题描述

toList() 释放所有可以发布的项目,但是我想将发布的项目组合到某个时间并在那个时间发布。

这是我最初实现它的方式。

        val queue = LinkedBlockingQueue<Int>()
        val random = Random()
        val start = System.currentTimeMillis()

        (0..10).toObservable()
                .concatMap { item ->
                    Observable.just(item)
                            // Use delay to represent random work time.
                            .delay((random.nextLong() % 10) * 20, TimeUnit.MILLISECONDS)
                }
                .subscribe { item -> queue.add(item) }

        Observable.interval(50, TimeUnit.MILLISECONDS, Schedulers.io())
                .filter { queue.isNotEmpty() }
                .map {
                    val list = ArrayList<Int>()
                    var item = queue.poll()
                    while (item != null) {
                        list.add(item)
                        item = queue.poll()
                    }
                    list
                }
                .subscribe {
                    Log.i("Test", "${System.currentTimeMillis() - start} - ${it.joinToString(",")}")
                }

I/Test: 163 - 0,1,2
I/Test: 212 - 3
I/Test: 412 - 4
I/Test: 463 - 5,6,7
I/Test: 613 - 8,9
I/Test: 763 - 10

我使用扫描来创建一个单独的队列并避免让两个 Observable 引用它。

        (0..10).toObservable()
                .concatMap {
                    Observable.just(listOf(it))
                            // Use delay to represent random work time.
                            .delay((random.nextLong() % 10) * 20, TimeUnit.MILLISECONDS)
                }
                // Every 50 milliseconds, bundle items that have not yet been subscribed.
                .scan { t1: List<Int>, t2: List<Int> ->
                    t1 + t2
                }
                .debounce(50, TimeUnit.MILLISECONDS)
                //Outputs the published item.
                .subscribe {
                    Log.i("Test", "${System.currentTimeMillis() - start} - ${it.joinToString(",")}")
                }

I/Test: 160 - 0,1,2
I/Test: 261 - 0,1,2,3
I/Test: 442 - 0,1,2,3,4,5
I/Test: 623 - 0,1,2,3,4,5,6
I/Test: 743 - 0,1,2,3,4,5,6,7
I/Test: 819 - 0,1,2,3,4,5,6,7,8,9,10

每次发布时间的间隔也是不稳定的(使用queue时,每隔50ms输出,但使用scan()时,会出现70ms等意外的时间间隔),再次输出之前发布的item。

如果我想捆绑和发布在一定时间间隔内发布的物品,除了使用队列之外还有什么?

标签: rx-javarx-java2

解决方案


推荐阅读