java - Vert.x 中带背压的重计算输出结果
问题描述
有问题的应用程序处理来自客户端的请求,然后需要在服务器端进行大量计算。这个计算是逐个完成的,所以如果客户端阅读速度很慢,这个计算不应该继续进行(计算应该响应背压)。
计算现在表示为 a Supplier<Buffer>
,其中get()
调用可能需要很长时间并且需要多次调用,直到它响应null
(没有更多数据)。应该在单独的get()
线程池中调用(与其他请求共享),并且只有在客户端确实能够接受数据时才应该调用。
我目前的代码是:
ReadStream<Buffer> readStream = new MyComplicatedReadStream(supplier, executor)
.exceptionHandler(request::fail)
.endHandler(x -> request.response().end());
Pump.pump(readStream, request.response())).start();
我已经做了一个自定义实现ReadStream
来做到这一点,这有点工作,但很长,很笨重并且有同步问题。
我没有解决这个问题,而是想知道 vert.x / rx 中是否有一种惯用的方式来实现/实例化 aMyComplicatedReadStream
. 因此,对于 aSupplier<Buffer>
和一个ExecutorService
get a ReadStream<Buffer>
,它get()
使用给定的执行程序执行并且如果它被暂停则不会生成。
解决方案
我对 vert.x 的经验接近 0,但我对 rxjava 确实有一些经验。所以可能有更好的方法来做到这一点,但从 rxjava 的角度来看,您可以使用generate
方法来创建仅按需生成项目的“冷”流动对象。我相信在这种情况下,当流暂停时,supplier.get()
由于没有“需求” ,因此不会进行额外的调用
在这里使用 kotlin 语法,但我认为您可以轻松导出 java 版本。
Flowable.generate<Buffer> { emitter ->
val nextValue = supplier.get()
if (nextValue == null) {
emitter.onComplete()
} else {
emitter.onNext(nextValue)
}
}.subscribeOn(Schedulers.from(executor)) // this will make the above callback run in the given executor
由于供应商似乎持有某种状态,在某些情况下,您可能希望为每个消费者生成一个“新供应商”,在这种情况下,您可以使用 generate 方法的重载,该方法允许指定另一个回调来获取州(在您的情况下是供应商)。http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#generate-java.util.concurrent.Callable-io.reactivex.functions.BiConsumer-
看起来你可以将 flowable 转换为读取流:
ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable);
基于此处的文档:https ://vertx.tk/docs/vertx-rx/java2/#_read_stream_support
推荐阅读
- azure - 使用 ARM 模板动态启用/禁用部署 Azure Function
- python - 字符串匹配并在 Pandas 中获得超过 1 列
- c++ - 是否有解析属性值的工具?
- javascript - extjs 3.4.0 - 如何过滤读取 JSON 的存储
- ios - 如何将字符传说显示为两行?
- swift - 如何更新 MVVM 中的变量?
- javascript - javascript中不同对象数组的一种排序函数
- ios - 如何将企业 iOS App 分发到特定的 iPhone 设备类型
- android - Android Oreo 通知显示在状态栏中,但不显示在点通知中
- makefile - 如何编译 unison 的静态版本