首页 > 解决方案 > RXJava 顺序执行 observable

问题描述

我有多个返回Observable<String>. 每个函数在文件系统上执行命令。我需要一个接一个地执行每个函数,并在 observable 中获取函数的输出。最后,我想要一个Observable<String>包含按函数调用顺序输出的所有函数的输出

单独地,每个功能都按预期工作,但我需要正确合并输出。

我试过 Observable.concatArray(func1, func2, ... ) 像这样:

    return Observable.concatArray(
        func1(),
        func2(),
        func3(), 
        func4()
    );

但这只是保留了 observable 事件的顺序。不是函数的顺序。我的意思是如果 func1 发出事件 A 和 A',而 func2 发出 B 和 B',我将有 A->A'->B->B'。但是 func2 会在 func1 之后立即启动。这导致我的问题是 func1 需要在 func2 启动之前完成。

第一个函数通过 maven 在文件系统上生成目录。所以,一个长期的任务。第二,在这个目录中写入一个文件。但是 concatArray 在第一个之后立即启动第二个。并且命令失败,因为此时该目录不存在。

有没有办法避免像这样丑陋的事情:

Subject<String> result = PublishSubject.create();
Observable<String> func1Obs = funct1(); 
Observable<String> func2Obs = funct2(); 

func1Obs.subscribe(output -> result.onNext(output));
func1Obs.onDoComplete(() -> {
    func2Obs.subscribe(output -> result.onNext(output);
}
return result;

标签: javarx-javarx-java3

解决方案


作为 Suggest Progman,错误与 concatArray 无关,这是要使用的方法。问题是,在我的函数列表中,我使用了这种代码:

public Observable<String> func1() {
    Subject<String> result = PublishSubject.create();
    String output = dosomething()
    result.onNext(output);
}

这里的问题是,当你创建 observable 时,函数 doSomething() 会立即被调用。

解决方案是在Observable.create()您需要 onNext、onComplete 等时使用...:

public Observable<String> func1() {
    // See how we wrap our instruction inside create method
    return Observable.create( result -> {
        String output = dosomething()
        result.onNext(output);   
    });
}

或者Observable.defer(),如果您只需要等待订阅:

public Observable<String> func1() {
    // See how we wrap our instruction inside create method
    return Observable.defer( () -> dosomething());
}

之后,您可以致电:

return Observable.concatArray(
    func1(),
    func2(),
    func3(), 
    func4()
);

推荐阅读