首页 > 解决方案 > 项目反应堆背压(缓冲区大小?)问题

问题描述

我的目标是并行处理 gui 事件,但只有当我有处理能力加上必须始终处理最后一个事件时。我有一个可以调整大小的面板。每次调整大小都会产生新事件。我想在计算线程池上处理面板的新宽度、高度(Scheduler newParallel = Schedulers.newParallel("Computation", 4);) 以有序的方式。如果没有可用的线程,我需要删除最旧的 gui 事件,当线程可用时,它应该从背压队列中获取最新的。我编写了测试应用程序,但遇到了几个问题。在 gui 事件停止生成后,仍有相当长的时间来完成处理,最终将表现为不需要的动画效果。我的猜测是背压队列 size=256 保留了旧事件并且仍在处理它,但它与结果日志不匹配。在产生 561 个事件后,仅处理了 33 个事件(为什么不是 256 个?),ID 为 [0-32, 560]。有没有办法改变背压缓冲区的大小(我找不到办法),或者我应该用完全不同的方式来处理这个任务?我附上测试​​代码以供娱乐。

import java.util.Random;
import java.util.concurrent.TimeUnit;

import javafx.application.Application;
import javafx.beans.value.ChangeListener;
import javafx.scene.Scene;
import javafx.scene.layout.StackPane;
import javafx.stage.Stage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class BackpressureApp extends Application {

    public static void main(String[] args) {
        launch(args);
    }

    private int id = 0;

    @Override
    public void start(Stage stage) throws Exception {
        Scheduler computation = Schedulers.newParallel("Computation", 4);

        Flux<Width> flux = Flux.create(sink -> {
            stage.widthProperty().addListener((ChangeListener<Number>) (observable, oldValue, newValue) -> {
                Width width = new Width(id++, newValue.doubleValue());
                System.out.println("[" + Thread.currentThread().getName() + "] PUBLISH width=" + width);
                sink.next(width);
            });
        }, OverflowStrategy.LATEST);

        flux.concatMap(width -> Mono.just(width).subscribeOn(computation).map(this::process))
                .publishOn(Schedulers.single())
                .subscribe(width -> {
            System.out.println("[" + Thread.currentThread().getName() + "] RECEIVED width=" + width);
        });

        stage.setScene(new Scene(new StackPane()));
        stage.show();
    }

    public Width process(Width width) {
        Random random = new Random();
        int next = random.nextInt(1000);
        try {
            TimeUnit.MILLISECONDS.sleep(next);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("[" + Thread.currentThread().getName() + "] PROCESS width=" + width + " sleep=" + next);
        return width;
    }

}

class Width {

    private final int id;
    private final double width;

    public Width(int id, double width) {
        super();
        this.id = id;
        this.width = width;
    }

    public int getId() {
        return id;
    }

    public double getWidth() {
        return width;
    }

    @Override
    public String toString() {
        return "Width[id=" + id + ", width=" + width + "]";
    }
}

标签: project-reactorreactivex

解决方案


推荐阅读