首页 > 解决方案 > rxJava 与throttleFirst 运算符相反(不是节流而是收集)

问题描述

我需要跟随运算符,当元素到来时,启动计时器(为元素创建窗口)并将它们收集到ListorObservable/Flowable中。当为计时器指定的时间结束并且没有元素到来时,操作员不会发送空事件。当下一个元素到来时,会创建新的计时器并开始收集元素。

Rx java 有 Buffer 和 Window 运算符,但这些运算符有缺点:

可以过滤这个空元素,但我想避免使用空事件(基于计时器)污染调度程序List/Observable-s/Flowable-s

我花了一些时间,发现形式上非常相似,但在功能上起到了相反的作用 throttleFirst(long windowDuration, TimeUnit unit) https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.png

但不幸的是,它会节流,但不会收集物品。

标签: rx-javareactive-programmingrx-java2

解决方案


我不认为可以节省对象创建,因此如果您想避免空列表,请将它们过滤掉。

至于在新项目迟到时启动新的定期计时器,我想不出现有运营商的任何组合都可以在不丢失项目的情况下做到这一点。

我创建了以下“装置”,可以在不丢失物品的情况下做到这一点:

public static final class BufferWithTimeout<T> {

    Scheduler.Worker trampoline = Schedulers.trampoline().createWorker();

    final long timeout;
    
    final TimeUnit unit;
    
    final Scheduler.Worker worker;
    
    final SerialDisposable timer = new SerialDisposable();
    
    final PublishSubject<List<T>> output = PublishSubject.create();
    
    List<T> current;
    
    long bufferIndex;
    
    BufferWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
        this.worker = scheduler.createWorker();
        this.timeout = timeout;
        this.unit = unit;
    }
    
    void onValue(T value) {
        trampoline.schedule(() -> {
            if (timer.isDisposed()) {
                return;
            }
            if (current == null) {
                current = new ArrayList<>();
                long bi = ++bufferIndex;
                timer.set(worker.schedulePeriodically(() -> {
                    onTime(bi);
                }, timeout, timeout, unit));
            }
            current.add(value);
        });
    }

    void onTime(long index) {
        trampoline.schedule(() -> {
            if (index == bufferIndex && current != null) {
                if (current.isEmpty()) {
                    current = null;
                    bufferIndex++;
                    timer.set(null);
                } else {
                    output.onNext(current);
                    current = new ArrayList<>();
                }
            }
        });
    }

    void onTerminate(Throwable error) {
        timer.dispose();
        worker.dispose();
        trampoline.schedule(() -> {
            if (current != null && !current.isEmpty()) {
                output.onNext(current);
                current = null;
            }
            if (error != null) {
                output.onError(error);
            } else {
                output.onComplete();
            }
        });
    }
    
    void dispose() {
        timer.dispose();
        worker.dispose();
        trampoline.schedule(() -> {
            current = null;
        });
    }

    public static <T> ObservableTransformer<T, List<T>> create(
            long timeout, TimeUnit unit, Scheduler scheduler) {
        return o -> 
            Observable.defer(() -> {
                BufferWithTimeout<T> state = new BufferWithTimeout<>(
                    timeout, unit, scheduler);

                return  o
                        .doOnNext(v -> state.onValue(v))
                        .doOnError(e -> state.onTerminate(e))
                        .doOnComplete(() -> state.onTerminate(null))
                        .ignoreElements()
                        .<List<T>>toObservable()
                        .mergeWith(state.output.doOnDispose(state::dispose));
            });
    }
}

您可以通过以下方式尝试:

// generate events over time
        Observable.fromArray(1, 2, 3, 5, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31)
        .flatMap(v -> Observable.timer(v * 100, TimeUnit.MILLISECONDS).map(w -> v))

// apply operator
        .compose(BufferWithTimeout.create(
             700, TimeUnit.MILLISECONDS, Schedulers.computation()
        ))

// wait for it all
        .blockingSubscribe(System.out::println);

请注意,虽然这会为每个源元素创建更多对象,但有办法绕过它,但它会变得更加复杂。


推荐阅读