java - 终端操作中基于谓词的短路Java Stream管道
问题描述
我有Stream<E>
一些元素(不能重新创建流/遍历它两次),我想将元素减少为单个值。有了Collectors.reducing
etc. 这个任务有点容易。但是现在,我想基于一些短路减少Predicate<E>
并防止整个流被遍历。下面我将提供一些代码进行更多解释,但问题如下:如何使用 java 流实现这一点?
MCVE 所需的代码:
static class Element<T> {
private final boolean pred;
private final T value;
Element(
final boolean pred,
final T value) {
this.pred = pred;
this.value = value;
}
}
@SuppressWarnings("unused")
static <E, T> T implementMe(
final Stream<E> stream,
final Predicate<E> shortCircuitCondition,
final T shortCircuitValue,
final E identityValue,
final BinaryOperator<E> operator,
final Function<E, T> finisher) {
// how should I implement this?
return null;
}
@Test
public void test() {
final Collection<Element<Integer>> elements = Arrays.asList(
new Element<>(true, 3),
new Element<>(true, 3),
new Element<>(false, 3),
new Element<>(true, 3));
final AtomicInteger counter = new AtomicInteger(0);
final Stream<Element<Integer>> stream = elements.stream()
.peek(t -> counter.incrementAndGet());
final int value =
ShortCircuit.<Element<Integer>, Integer> implementMe(
stream,
e -> !e.pred,
Integer.MIN_VALUE,
new Element<>(true, 0),
(l, r) -> new Element<>(true, l.value + r.value),
e -> e.value);
Assert.assertEquals(Integer.MIN_VALUE, value);
Assert.assertEquals(3, counter.get());
}
使用简单的 for 循环,我可以通过以下方式实现短路行为:
static <E, T> T implementMe(
final Stream<E> stream,
final Predicate<E> shortCircuitCondition,
final T shortCircuitValue,
final E identityValue,
final BinaryOperator<E> operator,
final Function<E, T> finisher) {
E current = identityValue;
for (final E t : (Iterable<E>) stream::iterator) {
if (shortCircuitCondition.test(t)) {
return shortCircuitValue;
}
current = operator.apply(current, t);
}
return finisher.apply(current);
}
但我宁愿使用流 API,所以我尝试使用自定义收集器。这种方法虽然不会短路(断言错误,因为计数器达到 4):
static <E, T> T implementMe(
final Stream<E> stream,
final Predicate<E> shortCircuitCondition,
final T shortCircuitValue,
final E identityValue,
final BinaryOperator<E> operator,
final Function<E, T> finisher) {
final AtomicBoolean shortCircuit = new AtomicBoolean(false);
return stream.collect(
Collectors.collectingAndThen(
Collectors.reducing(identityValue, (l, r) -> {
if (shortCircuitCondition.test(r)) {
shortCircuit.set(true);
}
return operator.apply(l, r);
}),
e -> shortCircuit.get() ? shortCircuitValue : finisher.apply(e)));
}
现在,我可以使用自定义拆分器来防止在满足条件后前进到下一个元素:
static <E, T> T implementMe(
final Stream<E> stream,
final Predicate<E> shortCircuitCondition,
final T shortCircuitValue,
final E identityValue,
final BinaryOperator<E> operator,
final Function<E, T> finisher) {
final AtomicBoolean shortCircuit = new AtomicBoolean(false);
final Iterator<E> underlyingIterator = stream.iterator();
final Stream<E> wrappedStream = StreamSupport.stream(
new Spliterators.AbstractSpliterator<E>(0, 0) {
@Override
public boolean tryAdvance(final Consumer<? super E> action) {
if (!underlyingIterator.hasNext()) {
return false;
}
final E next = underlyingIterator.next();
if (shortCircuitCondition.test(next)) {
shortCircuit.set(true);
return false;
}
action.accept(next);
return true;
}
}, false);
return wrappedStream.collect(
Collectors.collectingAndThen(
Collectors.reducing(
identityValue,
operator),
e -> shortCircuit.get() ? shortCircuitValue : finisher.apply(e)));
}
这些方法都不够简单。我的问题如下:
- 有没有简单的方法来实现这个?
- 看到后
Sink.cancellationRequested
:有没有办法提供自定义的 Sink 实现,或者自定义的收集器/终端操作,让流的遍历短路? - 我的最后一种方法是否有意义,还是应该只使用 for 循环?
编辑:我意识到takeWhile
在这种情况下该操作可能是我想要的(这里提到了 java 8 解决方案)。虽然我的担忧仍然存在并且我想听到他们的好答案,但我更新的问题是我们是否也可以在终端操作中停止迭代(takeWhile
作为短路的有状态中间操作),例如,如果我们想要停止 -一些有状态的累加器的条件。
示例代码:
class Acc implements Consumer<Integer> {
int value = 0;
@Override
public void accept(Integer i) {
value += i;
}
public Acc merge(Acc other) {
value += other.value;
return this;
}
}
Predicate<Acc> threshold = a -> a.value > 6;
Arrays.asList(3, 4, 5).stream()
.collect(Acc::new, (acc, e) -> {
if(threshold.test(acc)) {
// How do I stop the traversal here?
}
acc.accept(e);
}, Acc::merge);
解决方案
推荐阅读
- java - 如何使用 MediaMetadataRetriever() 从 .mp3 文件中检索专辑封面
- mysql - mySql每隔一个工作日获取日期
- javascript - 包含一个或多个 a 的字符串的正则表达式
- events - 事件溯源和密码更改安全隐患
- python - Python中的“无”是什么意思?
- java - 我需要将 UI 初始化代码放在 IntelliJ Idea 插件中的什么位置?
- android - FragmentPagerAdapter 使用 getChildFragmentManager() 崩溃
- html - 更改信用卡占位符颜色
- c# - 手动创建的 UserControl 未触发 SizeChanged 事件
- google-chrome - 在本地服务器中使用地理定位 API