首页 > 解决方案 > 用于确保只有一个请求处于活动状态的阀门

问题描述

我有一个 RxJava2 Flowable 链,其中包含一些繁重的计算。我想确保每个请求只调用一个整个链,并且在任何给定时间最多有一个活动请求。

这是一个示例时间线:

-R-------R--R-R----R-------
--C....E--C....E----C....E-

在上面,- 是空闲时间,R 是输入请求,C 是发生的计算(从 C 开始,用 . 表示处理,用 E 表示计算结束)。

如你看到的:

这是我到目前为止得到的:

System.out.println("start");

CountDownLatch countDownLatch = new CountDownLatch(1);

AtomicBoolean valve = new AtomicBoolean(true);

Flowable.interval(100, TimeUnit.MILLISECONDS)
   .take(30)
   .filter(e -> valve.get())
   .observeOn(Schedulers.computation())
   .map(e -> {
      valve.set(false);

      System.out.println("map " + e);
      Thread.sleep(550);
      return e;
   })
   .observeOn(Schedulers.single())
   .subscribe(
      e -> {
         System.out.println("subscribe " + e);
         valve.set(true);
      },
      e -> {
         System.out.println("error " + e);
         valve.set(true);
      },
      () -> {
         System.out.println("complete");
         valve.set(true);
         countDownLatch.countDown();
      });

countDownLatch.await();

System.out.println("end");

所以我有这样的流程:

这似乎工作正常,但我想知道是否可以放置一个仅限 RxJava2 的解决方案,而不是使用 AtomicBoolean 的外部。我正在寻找它有两个原因:

上面的注意CountDownLatch只是为了测试——我从我的main方法中运行它,因为有一个observeOn,所以流程在另一个线程上执行,所以main在流程执行之前完成。在实际应用中,情况并非如此。

真正的用例是一个应用程序,它有一个按钮,可以触发一些需要几秒钟的计算。如果用户在上一次计算仍在进行中时多次单击,我不希望多次触发此操作。

如果在 RxJava 中对这个特定用例有更好的解决方案,我可以改变流程。

标签: javarx-java2

解决方案


推荐阅读