首页 > 解决方案 > Vert.x 中带背压的重计算输出结果

问题描述

有问题的应用程序处理来自客户端的请求,然后需要在服务器端进行大量计算。这个计算是逐个完成的,所以如果客户端阅读速度很慢,这个计算不应该继续进行(计算应该响应背压)。

计算现在表示为 a Supplier<Buffer>,其中get()调用可能需要很长时间并且需要多次调用,直到它响应null(没有更多数据)。应该在单独的get()线程池中调用(与其他请求共享),并且只有在客户端确实能够接受数据时才应该调用。

我目前的代码是:

ReadStream<Buffer> readStream = new MyComplicatedReadStream(supplier, executor)
   .exceptionHandler(request::fail)
   .endHandler(x -> request.response().end());
Pump.pump(readStream, request.response())).start();

我已经做了一个自定义实现ReadStream来做到这一点,这有点工作,但很长,很笨重并且有同步问题。

我没有解决这个问题,而是想知道 vert.x / rx 中是否有一种惯用的方式来实现/实例化 aMyComplicatedReadStream . 因此,对于 aSupplier<Buffer>和一个ExecutorServiceget a ReadStream<Buffer>,它get()使用给定的执行程序执行并且如果它被暂停则不会生成。

标签: javarx-javavert.x

解决方案


我对 vert.x 的经验接近 0,但我对 rxjava 确实有一些经验。所以可能有更好的方法来做到这一点,但从 rxjava 的角度来看,您可以使用generate方法来创建仅按需生成项目的“冷”流动对象。我相信在这种情况下,当流暂停时,supplier.get()由于没有“需求” ,因此不会进行额外的调用

在这里使用 kotlin 语法,但我认为您可以轻松导出 java 版本。

Flowable.generate<Buffer> { emitter ->
  val nextValue = supplier.get()
  if (nextValue == null) {
      emitter.onComplete()
  } else {
      emitter.onNext(nextValue)
  }
}.subscribeOn(Schedulers.from(executor)) // this will make the above callback run in the given executor

由于供应商似乎持有某种状态,在某些情况下,您可能希望为每个消费者生成一个“新供应商”,在这种情况下,您可以使用 generate 方法的重载,该方法允许指定另一个回调来获取州(在您的情况下是供应商)。http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#generate-java.util.concurrent.Callable-io.reactivex.functions.BiConsumer-

看起来你可以将 flowable 转换为读取流:

ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable);

基于此处的文档:https ://vertx.tk/docs/vertx-rx/java2/#_read_stream_support


推荐阅读