rx-java - rxJava 与throttleFirst 运算符相反(不是节流而是收集)
问题描述
我需要跟随运算符,当元素到来时,启动计时器(为元素创建窗口)并将它们收集到List
orObservable/Flowable
中。当为计时器指定的时间结束并且没有元素到来时,操作员不会发送空事件。当下一个元素到来时,会创建新的计时器并开始收集元素。
Rx java 有 Buffer 和 Window 运算符,但这些运算符有缺点:
带有签名的缓冲区http://reactivex.io/documentation/operators/buffer.html
buffer(long timespan, TimeUnit unit)
几乎适合,但是当时间跨度中没有元素出现时,它会产生带有空列表的事件。带有签名的窗口http://reactivex.io/documentation/operators/window.html
window(long timespan, TimeUnit unit)
几乎适合,但是Observables/Flowables
当时间跨度中没有元素时它会产生空。
可以过滤这个空元素,但我想避免使用空事件(基于计时器)污染调度程序List/Observable-s/Flowable-s
。
我花了一些时间,发现形式上非常相似,但在功能上起到了相反的作用
throttleFirst(long windowDuration, TimeUnit unit)
https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.png
但不幸的是,它会节流,但不会收集物品。
解决方案
我不认为可以节省对象创建,因此如果您想避免空列表,请将它们过滤掉。
至于在新项目迟到时启动新的定期计时器,我想不出现有运营商的任何组合都可以在不丢失项目的情况下做到这一点。
我创建了以下“装置”,可以在不丢失物品的情况下做到这一点:
public static final class BufferWithTimeout<T> {
Scheduler.Worker trampoline = Schedulers.trampoline().createWorker();
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
final SerialDisposable timer = new SerialDisposable();
final PublishSubject<List<T>> output = PublishSubject.create();
List<T> current;
long bufferIndex;
BufferWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
this.worker = scheduler.createWorker();
this.timeout = timeout;
this.unit = unit;
}
void onValue(T value) {
trampoline.schedule(() -> {
if (timer.isDisposed()) {
return;
}
if (current == null) {
current = new ArrayList<>();
long bi = ++bufferIndex;
timer.set(worker.schedulePeriodically(() -> {
onTime(bi);
}, timeout, timeout, unit));
}
current.add(value);
});
}
void onTime(long index) {
trampoline.schedule(() -> {
if (index == bufferIndex && current != null) {
if (current.isEmpty()) {
current = null;
bufferIndex++;
timer.set(null);
} else {
output.onNext(current);
current = new ArrayList<>();
}
}
});
}
void onTerminate(Throwable error) {
timer.dispose();
worker.dispose();
trampoline.schedule(() -> {
if (current != null && !current.isEmpty()) {
output.onNext(current);
current = null;
}
if (error != null) {
output.onError(error);
} else {
output.onComplete();
}
});
}
void dispose() {
timer.dispose();
worker.dispose();
trampoline.schedule(() -> {
current = null;
});
}
public static <T> ObservableTransformer<T, List<T>> create(
long timeout, TimeUnit unit, Scheduler scheduler) {
return o ->
Observable.defer(() -> {
BufferWithTimeout<T> state = new BufferWithTimeout<>(
timeout, unit, scheduler);
return o
.doOnNext(v -> state.onValue(v))
.doOnError(e -> state.onTerminate(e))
.doOnComplete(() -> state.onTerminate(null))
.ignoreElements()
.<List<T>>toObservable()
.mergeWith(state.output.doOnDispose(state::dispose));
});
}
}
您可以通过以下方式尝试:
// generate events over time
Observable.fromArray(1, 2, 3, 5, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31)
.flatMap(v -> Observable.timer(v * 100, TimeUnit.MILLISECONDS).map(w -> v))
// apply operator
.compose(BufferWithTimeout.create(
700, TimeUnit.MILLISECONDS, Schedulers.computation()
))
// wait for it all
.blockingSubscribe(System.out::println);
请注意,虽然这会为每个源元素创建更多对象,但有办法绕过它,但它会变得更加复杂。
推荐阅读
- flask - 为什么 Flask-Compress 在 AWS EC2 上不起作用?
- spring-boot - 如何在spring boot中配置资源子目录
- google-api - 错误的用户数 Google Reporting API v3
- .net - x86 调试和发布版本之间的双精度值更改 1 位
- python - 如何让 PyCharm 识别自定义属性装饰器?
- java - 在 Hibernate 的实体方法中使用命名查询
- xamarin - Xamarin 表单 TabbedPage
- javascript - 如何在jquery中添加和删除属性单击事件?
- ios - 允许用户使用来自另一个 viewController 的文本创建 tableViewCell?
- android - 如何更改我的应用程序中所有按钮的文本颜色?