首页 > 解决方案 > 即使条件失败,Akka Streams takeWhile 也会处理下一个元素

问题描述

我有一个非常简单的演员,只打印数字:-

public class PrintLineActor extends AbstractLoggingActor {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(Integer.class, i -> {
          System.out.println("Processing: " + i);
          sender().tell(i, self());
        }).build();
  }
}

现在,我有一个流来打印偶数,直到遇到奇数元素:-

  @Test
  public void streamsTest() throws Exception {

    ActorSystem system = ActorSystem.create("testSystem");
    ActorRef printActor = system.actorOf(Props.create(PrintLineActor.class));

    Integer[] intArray = new Integer[]{2,4,6,8,9,10,12};
    CompletionStage<List<Integer>> result = Source.from(Arrays.asList(intArray))
        .ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS))
        .takeWhile(i -> i != 9)
        .runWith(Sink.seq(), ActorMaterializer.create(system));

    List<Integer> result1 = result.toCompletableFuture().get();
    System.out.println("Result :- ");
    result1.forEach(System.out::println);
  }

不希望在处理 9 之后的任何元素也被发送给演员。但是,我看到演员(但不是 12)也在处理数字“10”,如下面的输出所示

Processing: 2
Processing: 4
Processing: 6
Processing: 8
Processing: 9
Processing: 10 //WHY IS THIS BEING PROCESSED BY ACTOR??

Result :- 
2
4
6
8

为什么演员要处理10?如何阻止这种情况?

编辑:

我已经尝试通过记录事件的时间戳来进行调试,只是为了查看是否在 9 实际完成之前处理了 10,但是不,在完全处理 9 之后获取 10。这是日志:-

Before Ask: 2 in 1596035906509
Processing inside Actor: 2 at 1596035906509
Inside TakeWhile 2 at  in 1596035906509

Before Ask: 4 in 1596035906609
Processing inside Actor: 4 at 1596035906610
Inside TakeWhile 4 at  in 1596035906610

Before Ask: 6 in 1596035906712
Processing inside Actor: 6 at 1596035906712
Inside TakeWhile 6 at  in 1596035906712

Before Ask: 8 in 1596035906814
Processing inside Actor: 8 at 1596035906814
Inside TakeWhile 8 at  in 1596035906815

Before Ask: 9 in 1596035906915
Processing inside Actor: 9 at 1596035906915
Inside TakeWhile 9 at  in 1596035906916

Before Ask: 10 in 1596035907017 //so 10 is taken much after the 9 is processed fully
Processing inside Actor: 10 at 1596035907017

Result :- 
2
4
6
8

此外,如果我将 .ask 替换为直接 .map(print..),则不会打印 10。所以当涉及actor.ask时为什么会发生这种情况对我来说很奇怪。

标签: javascalaakkaakka-stream

解决方案


因为你askprintActor 异步而不是同步打印。在您的确切运行中:

  • 消息 9 到达 PrintActor,打印“Processing: 9”
  • 消息 10 到达 PrintActor,打印“Processing: 10”
  • 您的 Akka 流从 PrintActor 接收到消息 9 的响应消息,完成 Akka 流,因此结果中既没有 9 也没有 10。

要解决确切的问题,请删除异步询问并改为同步打印。但不确定 PrintActor 是否只是一个类比,请告诉我。


推荐阅读