java - 即使条件失败,Akka Streams takeWhile 也会处理下一个元素
问题描述
我有一个非常简单的演员,只打印数字:-
public class PrintLineActor extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, i -> {
System.out.println("Processing: " + i);
sender().tell(i, self());
}).build();
}
}
现在,我有一个流来打印偶数,直到遇到奇数元素:-
@Test
public void streamsTest() throws Exception {
ActorSystem system = ActorSystem.create("testSystem");
ActorRef printActor = system.actorOf(Props.create(PrintLineActor.class));
Integer[] intArray = new Integer[]{2,4,6,8,9,10,12};
CompletionStage<List<Integer>> result = Source.from(Arrays.asList(intArray))
.ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS))
.takeWhile(i -> i != 9)
.runWith(Sink.seq(), ActorMaterializer.create(system));
List<Integer> result1 = result.toCompletableFuture().get();
System.out.println("Result :- ");
result1.forEach(System.out::println);
}
我不希望在处理 9 之后的任何元素也被发送给演员。但是,我看到演员(但不是 12)也在处理数字“10”,如下面的输出所示
Processing: 2
Processing: 4
Processing: 6
Processing: 8
Processing: 9
Processing: 10 //WHY IS THIS BEING PROCESSED BY ACTOR??
Result :-
2
4
6
8
为什么演员要处理10?如何阻止这种情况?
编辑:
我已经尝试通过记录事件的时间戳来进行调试,只是为了查看是否在 9 实际完成之前处理了 10,但是不,在完全处理 9 之后获取 10。这是日志:-
Before Ask: 2 in 1596035906509
Processing inside Actor: 2 at 1596035906509
Inside TakeWhile 2 at in 1596035906509
Before Ask: 4 in 1596035906609
Processing inside Actor: 4 at 1596035906610
Inside TakeWhile 4 at in 1596035906610
Before Ask: 6 in 1596035906712
Processing inside Actor: 6 at 1596035906712
Inside TakeWhile 6 at in 1596035906712
Before Ask: 8 in 1596035906814
Processing inside Actor: 8 at 1596035906814
Inside TakeWhile 8 at in 1596035906815
Before Ask: 9 in 1596035906915
Processing inside Actor: 9 at 1596035906915
Inside TakeWhile 9 at in 1596035906916
Before Ask: 10 in 1596035907017 //so 10 is taken much after the 9 is processed fully
Processing inside Actor: 10 at 1596035907017
Result :-
2
4
6
8
此外,如果我将 .ask 替换为直接 .map(print..),则不会打印 10。所以当涉及actor.ask时为什么会发生这种情况对我来说很奇怪。
解决方案
因为你ask
printActor 异步而不是同步打印。在您的确切运行中:
- 消息 9 到达 PrintActor,打印“Processing: 9”
- 消息 10 到达 PrintActor,打印“Processing: 10”
- 您的 Akka 流从 PrintActor 接收到消息 9 的响应消息,完成 Akka 流,因此结果中既没有 9 也没有 10。
要解决确切的问题,请删除异步询问并改为同步打印。但不确定 PrintActor 是否只是一个类比,请告诉我。
推荐阅读
- r - 在 R 中计算 Returns1>Returns2 的天数
- python-3.x - 是否可以对 conda 或 pipenv 环境进行分层?
- c - C:无法从连接到子进程标准输出的管道中读取
- r - 如何连接连字符并分成两行的单词
- javascript - 在内联css样式中预期“,”
- python - Pylint Import "import routes" 应该放在模块的顶部
- python - 用另一个函数替换一个函数
- r - 将几行放入R中的一列
- python - keras Tensorflow 2中GRU和LSTM层中的num_units - 混淆含义
- discord - discord.py 冷却错误不允许引发其他错误