首页 > 解决方案 > 在 Emitter.onComplete 之后缓冲的 Flowable 中的线程中断

问题描述

我有一个Flowable赞:

Flowable.<String>create(onSubscribe, BackpressureStrategy.DROP)
    .doOnSubscribe(sub -> {
        System.out.println("onSubscribe");
    })
    .onBackpressureDrop(sns2 -> LOG.warn("Backpressure, dropping " + Arrays.asList(sns2)))
    .buffer(1, TimeUnit.SECONDS)
    .doOnTerminate(() -> {
        System.out.println("onTerminate");
    })
    .onErrorReturn(error -> {
        System.out.println("Error, will cancel scan: " + error);
        throw new RuntimeException(error);
    })
    .subscribe(objs -> /* NIO work here */);

当我在调用onComplete后立即调用源发射器时,我在lambdaonNext内部得到一个中断。subscribe我认为这是预期的行为:https ://github.com/ReactiveX/RxJava/issues/3601 。这似乎发生在订阅者正在处理来自缓冲区的下游对象时(我想如果它是单线程的,那就没有问题了)。

这是一个问题,因为我在这里执行 NIO 工作,这对Thread.interrupt. 这发生在缓冲区完成发送所有工作之前,因此其中一些objs没有完全处理。

这与Scheduler? 我应该使用 IO 吗?我如何“保护”在订阅者中执行的工作

标签: rx-java

解决方案


我应该在 IO 线程上注册我的订阅者:

    .subscribeOn(Schedulers.io(), false)
    .subscribe(objs -> /* NIO work here */);

我想我是在假设,由于 upstream buffer,中断被放置在来自 的线程上buffer,这也将在 . 选择的任何线程上完成subscribeOn。但似乎情况并非如此。

如果有人可以提供更好的解释,我想阅读它。


推荐阅读