首页 > 解决方案 > 如何合并多个生成器?

问题描述

InputStream使用生成器阅读:

public static Flowable<byte[]> from(final InputStream is) {
    return Flowable.generate(new Consumer<Emitter<byte[]>>() {
        @Override
        public void accept(Emitter<byte[]> emitter) throws Exception {
            byte[] buffer = new byte[1024];
            int count = is.read(buffer);
            if (count == -1) {
                emitter.onComplete();
            } else if (count < bufferSize) {
                emitter.onNext(Arrays.copyOf(buffer, count));
            } else {
                emitter.onNext(buffer);
            }
        }
    });
}

没关系。但是我有两个流程:

InputStream stdout = process.getInputStream();
InputStream stderr = process.getErrorStream();

我想用一个线程阅读它们。我认为这是不可能的,因为读取 - 阻塞操作。

主要任务 - 我想将这些流合并为一个:

//IllegalStateException: onNext already called in this generate turn
Flowable<byte[]> stdOutAndStdErr = from(stderr).mergeWith(from(stdout)

“发电机”有可能吗?

标签: rx-javareactive-programmingrx-java2

解决方案


这可能晚了几年才能帮助您,但是对于落入这个陷阱的其他人,我给您这篇很棒的文章

简而言之,您不能使用Flowable.generate()不受控制的来源。你必须使用Flowable.create(). 不受控制的来源是您无法控制下一个数据何时出现的来源。受控源类似于迭代器,其中下一个值由消费者控制。


推荐阅读