rx-java - 如何合并多个生成器?
问题描述
我InputStream
使用生成器阅读:
public static Flowable<byte[]> from(final InputStream is) {
return Flowable.generate(new Consumer<Emitter<byte[]>>() {
@Override
public void accept(Emitter<byte[]> emitter) throws Exception {
byte[] buffer = new byte[1024];
int count = is.read(buffer);
if (count == -1) {
emitter.onComplete();
} else if (count < bufferSize) {
emitter.onNext(Arrays.copyOf(buffer, count));
} else {
emitter.onNext(buffer);
}
}
});
}
没关系。但是我有两个流程:
InputStream stdout = process.getInputStream();
InputStream stderr = process.getErrorStream();
我想用一个线程阅读它们。我认为这是不可能的,因为读取 - 阻塞操作。
但主要任务 - 我想将这些流合并为一个:
//IllegalStateException: onNext already called in this generate turn
Flowable<byte[]> stdOutAndStdErr = from(stderr).mergeWith(from(stdout)
“发电机”有可能吗?
解决方案
这可能晚了几年才能帮助您,但是对于落入这个陷阱的其他人,我给您这篇很棒的文章。
简而言之,您不能使用Flowable.generate()
不受控制的来源。你必须使用Flowable.create()
. 不受控制的来源是您无法控制下一个数据何时出现的来源。受控源类似于迭代器,其中下一个值由消费者控制。
推荐阅读
- regex - 将两封电子邮件(相同号码)与 VBA 和 RegEx 匹配,然后移动两封邮件
- python - 为什么在使用 todoist python api 时会返回这个奇怪的 ID?
- java - 如何将请求正文中的列表值发送到 Rest Api
- powershell - 在 PowerShell 中导出用户的 AD 组成员身份
- enums - 如何改变我正在使用 Enum.map 迭代的 elixir 中的列表?或需要关于使用嵌套递归的意见
- php - 访问 array-key-array-objects 中的数据
- r - 计算值以了解一组数值中的趋势
- ios - 如何在文件swift中写入字节数据
- python - 将预训练的 resnet 模型加载到cleverhans 模型格式
- java - IntelliJ 中的本地 Tomcat - 无法运行程序 catalina.bat