project-reactor - 项目反应堆背压(缓冲区大小?)问题
问题描述
我的目标是并行处理 gui 事件,但只有当我有处理能力加上必须始终处理最后一个事件时。我有一个可以调整大小的面板。每次调整大小都会产生新事件。我想在计算线程池上处理面板的新宽度、高度(Scheduler newParallel = Schedulers.newParallel("Computation", 4);
) 以有序的方式。如果没有可用的线程,我需要删除最旧的 gui 事件,当线程可用时,它应该从背压队列中获取最新的。我编写了测试应用程序,但遇到了几个问题。在 gui 事件停止生成后,仍有相当长的时间来完成处理,最终将表现为不需要的动画效果。我的猜测是背压队列 size=256 保留了旧事件并且仍在处理它,但它与结果日志不匹配。在产生 561 个事件后,仅处理了 33 个事件(为什么不是 256 个?),ID 为 [0-32, 560]。有没有办法改变背压缓冲区的大小(我找不到办法),或者我应该用完全不同的方式来处理这个任务?我附上测试代码以供娱乐。
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javafx.application.Application;
import javafx.beans.value.ChangeListener;
import javafx.scene.Scene;
import javafx.scene.layout.StackPane;
import javafx.stage.Stage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class BackpressureApp extends Application {
public static void main(String[] args) {
launch(args);
}
private int id = 0;
@Override
public void start(Stage stage) throws Exception {
Scheduler computation = Schedulers.newParallel("Computation", 4);
Flux<Width> flux = Flux.create(sink -> {
stage.widthProperty().addListener((ChangeListener<Number>) (observable, oldValue, newValue) -> {
Width width = new Width(id++, newValue.doubleValue());
System.out.println("[" + Thread.currentThread().getName() + "] PUBLISH width=" + width);
sink.next(width);
});
}, OverflowStrategy.LATEST);
flux.concatMap(width -> Mono.just(width).subscribeOn(computation).map(this::process))
.publishOn(Schedulers.single())
.subscribe(width -> {
System.out.println("[" + Thread.currentThread().getName() + "] RECEIVED width=" + width);
});
stage.setScene(new Scene(new StackPane()));
stage.show();
}
public Width process(Width width) {
Random random = new Random();
int next = random.nextInt(1000);
try {
TimeUnit.MILLISECONDS.sleep(next);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("[" + Thread.currentThread().getName() + "] PROCESS width=" + width + " sleep=" + next);
return width;
}
}
class Width {
private final int id;
private final double width;
public Width(int id, double width) {
super();
this.id = id;
this.width = width;
}
public int getId() {
return id;
}
public double getWidth() {
return width;
}
@Override
public String toString() {
return "Width[id=" + id + ", width=" + width + "]";
}
}
解决方案
推荐阅读
- sql - 如何获取活动 SQL Server 的路径
- javascript - jQuery 显示/隐藏所需的密码字段
- c# - 填写 PDF 表格的问题。我应该使用 C# 和 IText AcroFields 以编程方式从数据库中填写表单吗?还是这个解决方案已经过时了?
- r - R中的多线子图
- firefox - Firefox 中的SVG渲染已关闭
- dialogflow-es-fulfillment - 当用户单击对话框流按钮链接时如何触发后续意图
- sql-server - Django 迁移不包含与 MSSQL 数据库的复合外键关系和复合主键
- apache-kafka - 是否可以在 DCOS 中监控 kafka 生产者/消费者指标?
- android - 我正在开发 Cordova/Ionic 应用程序,全屏视频播放已停止在 Chrome 73 中工作
- ajax - 我在 django 中对这个 AJAX 做错了什么?