首页 > 解决方案 > Akka Scheduler:仅在当前运行完成时运行下一个

问题描述

使用 Akka Scheduler 调度作业是这样的(至少从文档中):

system.scheduler().schedule(
    Duration.Zero(),
    Duration.create(5, TimeUnit.SECONDS),
    workerActor,
    new MessageToTheActor(),
    system.dispatcher(), ActorRef.noSender());

但是,我不明白如何确保下一次运行仅在当前运行完成时发生。我一直在四处寻找没有成功:(

标签: javaakka

解决方案


对于您的用例,调度程序是错误的工具。

另一种选择是 Akka Stream 的Sink.actorRefWithAck(下面的代码改编自链接文档中的示例,并借用了那里定义的实用程序类)。您需要调整工作角色以处理与流状态相关的一些消息并回复确认消息。确认消息充当背压信号,并指示参与者已准备好处理下MessageToTheActor一条消息。工人演员看起来像下面这样:

enum Ack {
  INSTANCE;
}

static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
  private final Throwable cause;
  public StreamFailure(Throwable cause) { this.cause = cause; }

  public Throwable getCause() { return cause; }
}

public class MyWorker extends AbstractLoggingActor {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(StreamInitialized.class, init -> {
        log().info("Stream initialized");
        sender().tell(Ack.INSTANCE, self());
      })
      .match(MessageToTheActor.class, msg -> {
        log().info("Received message: {}", msg);
        // do something with the message...
        sender().tell(Ack.INSTANCE, self());
      })
      .match(StreamCompleted.class, completed -> {
        log().info("Stream completed");
      })
      .match(StreamFailure.class, failed -> {
        log().error(failed.getCause(),"Stream failed!");
      })
      .build();
  }
}

Sink.actorRefWithAck与上述演员一起使用:

final ActorSystem system = ActorSystem.create("MySystem");
final Materializer materializer = ActorMaterializer.create(system);

ActorRef workerActor = system.actorOf(Props.create(MyWorker.class, "worker"));

Source<MessageToTheActor, NotUsed> messages = Source.repeat(new MessageToTheActor());

Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(
  workerActor,
  new StreamInitialized(),
  Ack.INSTANCE,
  new StreamCompleted(),
  ex -> new StreamFailure(ex)
);

messages.runWith(sink, materializer);

请注意 的使用Source.repeat,在这种情况下会不断发出MessageToTheActor消息。UsingSink.actorRefWithAck确保actor在处理完当前消息之前不会收到另一条消息。

需要以下导入(显然,Akka Streams依赖项也是如此):

import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;

推荐阅读