java - RxJava 基于回调向 Stream 添加事件
问题描述
添加一些代码来解决问题
//generates a sequence in the range from input value (+1) to input value (+9)
Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color) {
return Observable.range(value+1,9)
.map(i -> {
Log.d(TAG, "Value " + i
+ " evaluating on " + Thread.currentThread().getName()
+ " emitting item at " + System.currentTimeMillis());
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
}
return new ColoredIntegerModel(i, color);
});
}
//creates a stream if say input =2 of numbers from 1-20 (input*2) such that the output is 1 (Red color) 2-10(green color) 11 (Red color) 11-20 (Green Color)
Observable<ColoredIntegerModel> getEventStream(int value) {
return Observable.create(new ObservableOnSubscribe<ColoredIntegerModel>() {
@Override
public void subscribe(ObservableEmitter<ColoredIntegerModel> emitter) throws Exception {
for (int i = 0; i < value; ++i) {
ColoredIntegerModel model = new ColoredIntegerModel(i*10, Color.RED);
emitter.onNext(model);
Observable<ColoredIntegerModel> more = getSequenceObservable(i*10, 100, Color.GREEN);
more.subscribe(new Consumer<ColoredIntegerModel>() {
@Override
public void accept(ColoredIntegerModel coloredIntegerModel) throws Exception {
emitter.onNext(coloredIntegerModel);
}
});
}
}
});
}
上面的代码有效。它打印 1(红色)2-10(绿色)11(红色)、12-20,但我想要一个更清洁的解决方案。我也不确定何时可以关闭 getEventStream() 中的内部订阅。
问题基本上是 getEventStream 正在为每个返回一个 Observable 的发射调用一个函数。这类似于 Promise 链,其中每个单独的 Promise 都可以返回一系列其他 Promise。希望这可以澄清对原始问题的任何困惑。
解决方案
你应该看看FlatMap 运算符
简而言之,它将 an 中的每个元素Observable
转换为它自己的元素Observable
并将它们连接起来。
解决您的问题的最简单方法可能是:
getEventStream()
.flatMap(it -> getSequenceObservable(it))
.doOnNext(System.out::print)
.blockingSubscribe();
辅助函数在哪里
static Observable<ColoredIntegerModel> getEventStream() {
return Observable.fromArray(
new ColoredIntegerModel(10, Color.RED),
new ColoredIntegerModel(20, Color.RED)
);
}
static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color) {
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.SECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN))
);
}
如果您想保留原始价值,getEventStream()
您可以使用类似的东西而不是getSequenceObservable
static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color) {
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.MILLISECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN)))
.concatWith(Observable.just(color));
}
如果排放的顺序很重要,请使用带有 maxConcurrency 的 flatMap 版本:
getEventStream()
.flatMap(it -> getSequenceObservable(it), true, 1)
.doOnNext(System.out::println)
.blockingSubscribe();
推荐阅读
- python-3.x - 多参数 url 模式 django 2.0
- jenkins - 访问其他作业声明性管道中的另一个目录
- reactjs - 如何在 ReactJS 中的另一个类中呈现类列表?
- json - JSON - 网络路径中的反斜杠
- dialogflow-es - Dialogflow:如何获取 sys.boolean?
- mysql - MySQL 中的“2018-03-22 00:00:00”有什么问题?
- ios - 用 Swift 中的另一个实例替换 UIView 的 self
- android - 黄油刀代码不起作用
- vb.net - 等到线程完成后再运行下一步
- batch-file - 批处理文件删除 X 天旧文件排除几个文件