java - 用于确保只有一个请求处于活动状态的阀门
问题描述
我有一个 RxJava2 Flowable 链,其中包含一些繁重的计算。我想确保每个请求只调用一个整个链,并且在任何给定时间最多有一个活动请求。
这是一个示例时间线:
-R-------R--R-R----R-------
--C....E--C....E----C....E-
在上面,- 是空闲时间,R 是输入请求,C 是发生的计算(从 C 开始,用 . 表示处理,用 E 表示计算结束)。
如你看到的:
- 第一个请求触发第一个计算
- 第二个请求触发第二个计算
- 请求 3 和 4 在第二次计算运行时被忽略
- 请求 5 触发第三次计算
这是我到目前为止得到的:
System.out.println("start");
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicBoolean valve = new AtomicBoolean(true);
Flowable.interval(100, TimeUnit.MILLISECONDS)
.take(30)
.filter(e -> valve.get())
.observeOn(Schedulers.computation())
.map(e -> {
valve.set(false);
System.out.println("map " + e);
Thread.sleep(550);
return e;
})
.observeOn(Schedulers.single())
.subscribe(
e -> {
System.out.println("subscribe " + e);
valve.set(true);
},
e -> {
System.out.println("error " + e);
valve.set(true);
},
() -> {
System.out.println("complete");
valve.set(true);
countDownLatch.countDown();
});
countDownLatch.await();
System.out.println("end");
所以我有这样的流程:
- 发出 30 个整数,它们之间的间隔为 100 毫秒
- 我在计算调度程序上观察到
- 我将阀门设置为 false,以便没有新的请求进入
- 我用 550 毫秒的模拟计算时间来映射它
- 我在单个调度程序上观察结果
- 我将阀门设置为 true,因此可以处理新请求
这似乎工作正常,但我想知道是否可以放置一个仅限 RxJava2 的解决方案,而不是使用 AtomicBoolean 的外部。我正在寻找它有两个原因:
- 我可能会在几个地方需要这个,所以如果 RxJava2 已经支持开箱即用的东西,我不想重复相同的代码
- 我不确定在并发方面是否有任何后果会导致它不起作用。例如,我假设在 filter 方法之后应该将 Valve 设置为 false,因为 observeOn 会切换到另一个线程并且 map 可能运行得太晚,以防请求一个接一个地快速进入
上面的注意CountDownLatch
只是为了测试——我从我的main
方法中运行它,因为有一个observeOn
,所以流程在另一个线程上执行,所以main
在流程执行之前完成。在实际应用中,情况并非如此。
真正的用例是一个应用程序,它有一个按钮,可以触发一些需要几秒钟的计算。如果用户在上一次计算仍在进行中时多次单击,我不希望多次触发此操作。
如果在 RxJava 中对这个特定用例有更好的解决方案,我可以改变流程。
解决方案
推荐阅读
- python - 如何确认 pip install 正在安装到正确的 python 版本?
- facebook - Facebook 应用程序 URL 中的“ref=br_rs”是什么意思?
- python - 如何在必须根据决策替换元素的列表中处理重复项?
- python - 如何保存临时图像,然后用opencv显示它
- html - 我使用 DIV 制作表格,当表格行太多时,我需要通过 TBody div 滚动
- java - 如何让我的 Java 类中的方法返回 findViewById?
- python - 为什么 findAll 不从网页返回预期的 div 数?
- ubuntu - 为什么优雅地关闭 emacs 守护进程很重要?
- swift - 如果我创建一个 UILabel 作为我的 LoadView() 的一部分,与创建一个文件私有 uilabel 有什么区别?
- html - 如何从浏览器手机打开微信并打开其中的链接