rx-java - 在 Emitter.onComplete 之后缓冲的 Flowable 中的线程中断
问题描述
我有一个Flowable
赞:
Flowable.<String>create(onSubscribe, BackpressureStrategy.DROP)
.doOnSubscribe(sub -> {
System.out.println("onSubscribe");
})
.onBackpressureDrop(sns2 -> LOG.warn("Backpressure, dropping " + Arrays.asList(sns2)))
.buffer(1, TimeUnit.SECONDS)
.doOnTerminate(() -> {
System.out.println("onTerminate");
})
.onErrorReturn(error -> {
System.out.println("Error, will cancel scan: " + error);
throw new RuntimeException(error);
})
.subscribe(objs -> /* NIO work here */);
当我在调用onComplete
后立即调用源发射器时,我在lambdaonNext
内部得到一个中断。subscribe
我认为这是预期的行为:https ://github.com/ReactiveX/RxJava/issues/3601 。这似乎发生在订阅者正在处理来自缓冲区的下游对象时(我想如果它是单线程的,那就没有问题了)。
这是一个问题,因为我在这里执行 NIO 工作,这对Thread.interrupt
. 这发生在缓冲区完成发送所有工作之前,因此其中一些objs
没有完全处理。
这与Scheduler
? 我应该使用 IO 吗?我如何“保护”在订阅者中执行的工作
解决方案
我应该在 IO 线程上注册我的订阅者:
.subscribeOn(Schedulers.io(), false)
.subscribe(objs -> /* NIO work here */);
我想我是在假设,由于 upstream buffer
,中断被放置在来自 的线程上buffer
,这也将在 . 选择的任何线程上完成subscribeOn
。但似乎情况并非如此。
如果有人可以提供更好的解释,我想阅读它。
推荐阅读
- graalvm - 如何判断您的 Java 程序是否在 GraalVM AOT 上下文中运行?
- java - 使用 Jmeter 连接到 MQ,并提供返回地址
- python - 如何将变量注入到 Django 模板的上下文中,类似于内置“url”标签的“as”参数?
- ios - 如何在 Swift 中将 UIColor 转换为 3/4/6/8 位十六进制字符串?
- gradle - 只为一个子项目调用 Gradle 任务
- java - Java Android If Else Break 出现问题故障排除
- python - Scipy welch 和 MATLAB pwelch 不提供相同的答案
- java - Springboot 测试未返回预期结果
- c# - 如何让我的角色以第三人称在镜头周围跑来跑去?
- javascript - 似乎无法将我的 CSS 和 JS/Jquery 文件集成到我的 HTML 中