rxjs - 每秒缓冲 N 个值 RxJava,Project Reactor
问题描述
我有一个带有一些值的流:
Flux<Integer> stream = getStream();
我试图实现每秒 N 个项目的功能
stream.bufferTimeout(MAX_SIZE_TWO, _1_SECOND).subscribe(val => {
System.out.println(val);
});
我正在尝试找到接近我预期结果的运营商。
预期结果:
time: 15:00:00, stream_next_value: 1, output: {1}
time: 15:00:00, stream_next_value: 2, output: {2}
time: 15:00:00, stream_next_value: 3, no output => buffer
time: 15:00:00, stream_next_value: 4, no output => buffer
time: 15:00:00, stream_next_value: 5, no output => buffer
time: 15:00:01, stream_no_next_value, output: {3,4}
time: 15:00:01, stream_next_value: 6, no output => buffer
time: 15:00:02, stream_no_next_value, output: {5,6}
但看起来缓冲区运算符的重载版本不支持这种行为。
如何使用缓冲区运算符实现预期行为?
解决方案
也许你可以这样做:
Flowable<Long> stream = Flowable.generate(() -> 0L, (next, emitter) -> {
emitter.onNext(next);
return next + 1;
});
// Flowable<Long> stream = Flowable.interval(100, MILLISECONDS);
// .onBackpressureDrop(); // to make it works otherwise get a MissingBackPressureException
stream.buffer(2)
.zipWith(Flowable.interval(1, SECONDS), (first, second) -> first)
.flatMap(Flowable::fromIterable)
.subscribe(s -> LOGGER.info("received: " + s),
Throwable::printStackTrace);
当心stream
必须尊重背压,否则您必须添加一个onBackpressureXXX()
运算符(例如,如果流是一个interval()
(参见注释代码),情况就是如此)。你会得到这样的输出:
14:39:59.538 | INFO | RxComputationThreadPool-1 | received: 0
14:39:59.540 | INFO | RxComputationThreadPool-1 | received: 1
14:40:00.527 | INFO | RxComputationThreadPool-1 | received: 2
14:40:00.528 | INFO | RxComputationThreadPool-1 | received: 3
14:40:01.528 | INFO | RxComputationThreadPool-1 | received: 4
14:40:01.528 | INFO | RxComputationThreadPool-1 | received: 5
14:40:02.528 | INFO | RxComputationThreadPool-1 | received: 6
14:40:02.528 | INFO | RxComputationThreadPool-1 | received: 7
14:40:03.528 | INFO | RxComputationThreadPool-1 | received: 8
14:40:03.528 | INFO | RxComputationThreadPool-1 | received: 9
推荐阅读
- c# - 属性未通过 newtonsoft 在 Json 中序列化
- postgresql - Postgres SETOF 到没有子查询的数组
- javascript - Typescript:通过 React 的 useCallback 传递推断类型
- .net - 返回 Null 的对象:线程执行
- django - 名为“user”的 URL 关键字参数。修复您的 URL 配置,或正确设置视图上的 `.lookup_field` 属性
- javascript - 有没有办法让构建的 Vue.js 应用程序读取 Kubernetes 部署中定义的环境变量?
- flutter - 颤振基于圆半径调整/缩放谷歌地图缩放级别
- resize - 如何在 tmux send-keys 命令中使用内部变量?
- electron - 防止用户在电子中退出全屏模式到窗口模式?
- c - C语言中%lg的含义