rx-java - 我可以捆绑在 Rxjava2 中的功能时间间隔内发布的项目吗?
问题描述
toList() 释放所有可以发布的项目,但是我想将发布的项目组合到某个时间并在那个时间发布。
这是我最初实现它的方式。
val queue = LinkedBlockingQueue<Int>()
val random = Random()
val start = System.currentTimeMillis()
(0..10).toObservable()
.concatMap { item ->
Observable.just(item)
// Use delay to represent random work time.
.delay((random.nextLong() % 10) * 20, TimeUnit.MILLISECONDS)
}
.subscribe { item -> queue.add(item) }
Observable.interval(50, TimeUnit.MILLISECONDS, Schedulers.io())
.filter { queue.isNotEmpty() }
.map {
val list = ArrayList<Int>()
var item = queue.poll()
while (item != null) {
list.add(item)
item = queue.poll()
}
list
}
.subscribe {
Log.i("Test", "${System.currentTimeMillis() - start} - ${it.joinToString(",")}")
}
I/Test: 163 - 0,1,2
I/Test: 212 - 3
I/Test: 412 - 4
I/Test: 463 - 5,6,7
I/Test: 613 - 8,9
I/Test: 763 - 10
我使用扫描来创建一个单独的队列并避免让两个 Observable 引用它。
(0..10).toObservable()
.concatMap {
Observable.just(listOf(it))
// Use delay to represent random work time.
.delay((random.nextLong() % 10) * 20, TimeUnit.MILLISECONDS)
}
// Every 50 milliseconds, bundle items that have not yet been subscribed.
.scan { t1: List<Int>, t2: List<Int> ->
t1 + t2
}
.debounce(50, TimeUnit.MILLISECONDS)
//Outputs the published item.
.subscribe {
Log.i("Test", "${System.currentTimeMillis() - start} - ${it.joinToString(",")}")
}
I/Test: 160 - 0,1,2
I/Test: 261 - 0,1,2,3
I/Test: 442 - 0,1,2,3,4,5
I/Test: 623 - 0,1,2,3,4,5,6
I/Test: 743 - 0,1,2,3,4,5,6,7
I/Test: 819 - 0,1,2,3,4,5,6,7,8,9,10
每次发布时间的间隔也是不稳定的(使用queue时,每隔50ms输出,但使用scan()时,会出现70ms等意外的时间间隔),再次输出之前发布的item。
如果我想捆绑和发布在一定时间间隔内发布的物品,除了使用队列之外还有什么?
解决方案
推荐阅读
- swift - 无法创建 UICollectionReusableView 的 IBOutlet
- scala - why DataFrame still there in spark 2.2 also even DataSet gives more performance in scala?
- c++ - 使用 std::fill 初始化结构对象
- swift - UIViewControllers 共享“通用”IBAction
- angular - 数据在延迟后加载到 UI 上
- mysql - 验证用户名是否已存在
- php - 如何从 PHP 的下拉菜单中预先选择一个选项
- python-3.x - 客户端连接后通过ws从其他类发送数据
- c# - 为什么 context.SaveChanges() 会更新表中的每一行?
- php - Laravel 在迁移时显示错误