首页 > 解决方案 > 没有初始值的rxjava扫描函数

问题描述

我想要这样的扫描功能:

fun Observable<T>.scan(initialValueProvider: (T) -> R, accumulator: (R, T) -> R)

基本上它使用 initialValueProvider 作为第一个发出的值,并使用累加器作为后面的值。

这个扫描函数与 scanWith 函数非常相似,不同的是我想用 initialValueProvider 从第一个发射的项目中产生第一个值。

我需要为它编写一个自定义运算符吗?还是我们已经有了某种组合来获得该功能?

标签: kotlinrx-javarx-java2

解决方案


您确实必须使用有状态转换或编写自定义运算符。例如:

Observable.defer(() -> {
    AtomicReference<R> current = new AtomicReference<>();
    return source.map(v -> {
        R acc = current.get();
        if (acc == null) {
            acc = Objects.requireNonNull(initialValueProvider.apply(v));
            current.set(acc);
        } else {
            acc = Objects.requireNonNull(accumulator.apply(acc, v));
            current.set(acc);
        }
        return acc;
    });
});

推荐阅读