java - Akka Scheduler:仅在当前运行完成时运行下一个
问题描述
使用 Akka Scheduler 调度作业是这样的(至少从文档中):
system.scheduler().schedule(
Duration.Zero(),
Duration.create(5, TimeUnit.SECONDS),
workerActor,
new MessageToTheActor(),
system.dispatcher(), ActorRef.noSender());
但是,我不明白如何确保下一次运行仅在当前运行完成时发生。我一直在四处寻找没有成功:(
解决方案
对于您的用例,调度程序是错误的工具。
另一种选择是 Akka Stream 的Sink.actorRefWithAck
(下面的代码改编自链接文档中的示例,并借用了那里定义的实用程序类)。您需要调整工作角色以处理与流状态相关的一些消息并回复确认消息。确认消息充当背压信号,并指示参与者已准备好处理下MessageToTheActor
一条消息。工人演员看起来像下面这样:
enum Ack {
INSTANCE;
}
static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
private final Throwable cause;
public StreamFailure(Throwable cause) { this.cause = cause; }
public Throwable getCause() { return cause; }
}
public class MyWorker extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(StreamInitialized.class, init -> {
log().info("Stream initialized");
sender().tell(Ack.INSTANCE, self());
})
.match(MessageToTheActor.class, msg -> {
log().info("Received message: {}", msg);
// do something with the message...
sender().tell(Ack.INSTANCE, self());
})
.match(StreamCompleted.class, completed -> {
log().info("Stream completed");
})
.match(StreamFailure.class, failed -> {
log().error(failed.getCause(),"Stream failed!");
})
.build();
}
}
Sink.actorRefWithAck
与上述演员一起使用:
final ActorSystem system = ActorSystem.create("MySystem");
final Materializer materializer = ActorMaterializer.create(system);
ActorRef workerActor = system.actorOf(Props.create(MyWorker.class, "worker"));
Source<MessageToTheActor, NotUsed> messages = Source.repeat(new MessageToTheActor());
Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(
workerActor,
new StreamInitialized(),
Ack.INSTANCE,
new StreamCompleted(),
ex -> new StreamFailure(ex)
);
messages.runWith(sink, materializer);
请注意 的使用Source.repeat
,在这种情况下会不断发出MessageToTheActor
消息。UsingSink.actorRefWithAck
确保actor在处理完当前消息之前不会收到另一条消息。
需要以下导入(显然,Akka Streams依赖项也是如此):
import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;
推荐阅读
- python - 在 Python (OpenCV) 中计算人脸旋转
- swiftui - SwiftUI“不能在属性初始化器中使用实例成员'numberOfDevice';属性初始化器在'self'可用之前运行”错误
- selenium - can I run a selenium driver with Quarkus?
- android - 如何将 ListView 构建器与 Providers 一起使用?
- sql - VLOOKUP 或多连接列
- html - 使用超过 1 个样式表的图标导致高度/对齐问题
- flutter - Flutter 应用程序冻结
- python-3.x - 我的 PyPI 安装包无法识别 chromedriver 文件
- django - 有没有办法让我设置虚拟环境而不会出现以下错误?
- python - 如何比较数字子列表中的第一项,如果重复,比较第二项并选择第二项最小的子列表?