首页 > 解决方案 > 终端操作中基于谓词的短路Java Stream管道

问题描述

我有Stream<E>一些元素(不能重新创建流/遍历它两次),我想将元素减少为单个值。有了Collectors.reducingetc. 这个任务有点容易。但是现在,我想基于一些短路减少Predicate<E>并防止整个流被遍历。下面我将提供一些代码进行更多解释,但问题如下:如何使用 java 流实现这一点?

MCVE 所需的代码:

static class Element<T> {

    private final boolean pred;
    private final T value;

    Element(
            final boolean pred,
            final T value) {
        this.pred = pred;
        this.value = value;
    }
}

@SuppressWarnings("unused")
static <E, T> T implementMe(
        final Stream<E> stream,
        final Predicate<E> shortCircuitCondition,
        final T shortCircuitValue,
        final E identityValue,
        final BinaryOperator<E> operator,
        final Function<E, T> finisher) {
    // how should I implement this?
    return null;
}

@Test
public void test() {
    final Collection<Element<Integer>> elements = Arrays.asList(
            new Element<>(true, 3),
            new Element<>(true, 3),
            new Element<>(false, 3),
            new Element<>(true, 3));

    final AtomicInteger counter = new AtomicInteger(0);
    final Stream<Element<Integer>> stream = elements.stream()
            .peek(t -> counter.incrementAndGet());
    final int value =
            ShortCircuit.<Element<Integer>, Integer> implementMe(
                    stream,
                    e -> !e.pred,
                    Integer.MIN_VALUE,
                    new Element<>(true, 0),
                    (l, r) -> new Element<>(true, l.value + r.value),
                    e -> e.value);

    Assert.assertEquals(Integer.MIN_VALUE, value);
    Assert.assertEquals(3, counter.get());
}

使用简单的 for 循环,我可以通过以下方式实现短路行为:

static <E, T> T implementMe(
        final Stream<E> stream,
        final Predicate<E> shortCircuitCondition,
        final T shortCircuitValue,
        final E identityValue,
        final BinaryOperator<E> operator,
        final Function<E, T> finisher) {
    E current = identityValue;
    for (final E t : (Iterable<E>) stream::iterator) {
        if (shortCircuitCondition.test(t)) {
            return shortCircuitValue;
        }
        current = operator.apply(current, t);
    }

    return finisher.apply(current);
}

但我宁愿使用流 API,所以我尝试使用自定义收集器。这种方法虽然不会短路(断言错误,因为计数器达到 4):

static <E, T> T implementMe(
        final Stream<E> stream,
        final Predicate<E> shortCircuitCondition,
        final T shortCircuitValue,
        final E identityValue,
        final BinaryOperator<E> operator,
        final Function<E, T> finisher) {
    final AtomicBoolean shortCircuit = new AtomicBoolean(false);
    return stream.collect(
            Collectors.collectingAndThen(
                    Collectors.reducing(identityValue, (l, r) -> {
                        if (shortCircuitCondition.test(r)) {
                            shortCircuit.set(true);
                        }
                        return operator.apply(l, r);
                    }),
                    e -> shortCircuit.get() ? shortCircuitValue : finisher.apply(e)));
}

现在,我可以使用自定义拆分器来防止在满足条件后前进到下一个元素:

static <E, T> T implementMe(
        final Stream<E> stream,
        final Predicate<E> shortCircuitCondition,
        final T shortCircuitValue,
        final E identityValue,
        final BinaryOperator<E> operator,
        final Function<E, T> finisher) {

    final AtomicBoolean shortCircuit = new AtomicBoolean(false);
    final Iterator<E> underlyingIterator = stream.iterator();
    final Stream<E> wrappedStream = StreamSupport.stream(
            new Spliterators.AbstractSpliterator<E>(0, 0) {

                @Override
                public boolean tryAdvance(final Consumer<? super E> action) {
                    if (!underlyingIterator.hasNext()) {
                        return false;
                    }
                    final E next = underlyingIterator.next();

                    if (shortCircuitCondition.test(next)) {
                        shortCircuit.set(true);
                        return false;
                    }

                    action.accept(next);
                    return true;
                }

            }, false);

    return wrappedStream.collect(
            Collectors.collectingAndThen(
                    Collectors.reducing(
                            identityValue,
                            operator),
                    e -> shortCircuit.get() ? shortCircuitValue : finisher.apply(e)));
}

这些方法都不够简单。我的问题如下:


编辑:我意识到takeWhile在这种情况下该操作可能是我想要的(这里提到了 java 8 解决方案)。虽然我的担忧仍然存在并且我想听到他们的好答案,但我更新的问题是我们是否也可以在终端操作中停止迭代(takeWhile作为短路的有状态中间操作),例如,如果我们想要停止 -一些有状态的累加器的条件。

示例代码:

class Acc implements Consumer<Integer> {

    int value = 0;

    @Override
    public void accept(Integer i) {
        value += i;
    }

    public Acc merge(Acc other) {
        value += other.value;
        return this;
    }
}

Predicate<Acc> threshold = a -> a.value > 6;
Arrays.asList(3, 4, 5).stream()
            .collect(Acc::new, (acc, e) -> {
                if(threshold.test(acc)) {
                    // How do I stop the traversal here?
                }
                acc.accept(e);
            }, Acc::merge);

标签: javajava-8java-stream

解决方案


推荐阅读