首页 > 解决方案 > 使用 RxJava 进行线程流水线操作

问题描述

RxJava 大师,这是你大放异彩的机会!

IllegalStateException您能否仅通过更改方法中以开头的 RxJava 管道来Flowable.generate()确保以下程序不会抛出异常main()

class ExportJob {
    private static Scheduler singleThread(String threadName) {
        return Schedulers.from(newFixedThreadPool(1, r -> {
            Thread t = new Thread(r, threadName);
            t.setDaemon(true);
            return t;
        }));
    }

    public static void main(String[] args) {
        Scheduler genSched = singleThread("genThread");
        Scheduler mapSched = singleThread("mapThread");
        // execute on "genThread"
        Flowable.generate(ExportJob::infiniteGenerator)
                .subscribeOn(genSched, false)
                // execute on "mapThread"
                .observeOn(mapSched, false)
                .concatMapMaybe(ExportJob::mapping)
                // execute on the thread that creates the pipeline, block it until finished
                .blockingForEach(ExportJob::terminal);
    }

    private static int nb;
    /** Must execute on "genThread" thread. */
    private static void infiniteGenerator(Emitter<Integer> emitter) {
        print(nb, "infiniteGenerator");
        emitter.onNext(nb++);
        checkCurrentThread("genThread");
    }

    /** Must execute on "mapThread" thread. */
    private static Maybe<Integer> mapping(Integer s) {
        print(s, "mapping");
        checkCurrentThread("mapThread");
        return Maybe.just(s);
    }

    /** Must execute on "terminal" thread. */
    private static void terminal(Integer s) {
        print(s, "terminal");
        checkCurrentThread("main");
    }

    private static void print(int item, String method) {
        System.out.format("%d - %s - %s()%n", item, Thread.currentThread().getName(), method);
    }

    private static void checkCurrentThread(String expectedThreadName) throws IllegalStateException {
        String name = Thread.currentThread().getName();
        if (!name.equals(expectedThreadName)) {
            throw new IllegalStateException("Thread changed from '" + expectedThreadName + "' to '" + name + "'");
        }
    }
}

标签: multithreadingrx-java2scheduling

解决方案


您必须使用subscribeOn(scheduler, true),以便将请求也路由回其预期的线程:

Flowable.generate(ExportJob::infiniteGenerator)
            .subscribeOn(genSched, true)  // <------------------------------
            // execute on "mapThread"
            .observeOn(mapSched, false)
            .concatMapMaybe(ExportJob::mapping)
            .subscribeOn(mapSched, true)  // <------------------------------
            .blockingForEach(ExportJob::terminal);

推荐阅读