scala - 流会被终止吗?
问题描述
我有演员,如下所示:
正如您在图像上看到的,ActorStream
是 的子级Actor
。问题是,当我终止 时Actor
,也会ActorStream
终止吗?
这是我如何创建ActorStream
in 的方式Actor
:
def create(fsm: ActorRef[ServerHealth], cancel: Option[Cancellable]): Behavior[ServerHealthStreamer] =
Behaviors.setup { context =>
implicit val system = context.system
implicit val materializer = ActorMaterializer()
implicit val dispatcher = materializer.executionContext
val kafkaServer = system
.settings
.config
.getConfig("kafka")
.getString("servers")
val sink: Sink[ServerHealth, NotUsed] = ActorSink.actorRefWithAck[ServerHealth, ServerHealthStreamer, Ack](
ref = context.self,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
val cancel = Source.tick(1.seconds, 15.seconds, NotUsed)
.flatMapConcat(_ => Source.fromFuture(health(kafkaServer)))
.map {
case true =>
KafkaActive
case false =>
KafkaInactive
}
.to(sink)
.run()
Behaviors.receiveMessage {
case Init(ackTo) =>
ackTo ! Ack
Behaviors.same
case Message(ackTo, msg) =>
fsm ! msg
ackTo ! Ack
create(fsm, Some(cancel))
case Complete =>
Behaviors.same
case Fail(_) =>
fsm ! KafkaInactive
Behaviors.same
}
}
解决方案
在您的情况下,演员终止必须终止流,因为在幕后舞台演员观看通过了 actorRef 并在 Terminated 到达时完成阶段
我想你可以在这里找到更多信息 https://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/
要理解的一个极其重要的方面是物化流作为一组参与者在分配它们的执行上下文的线程上运行。换句话说,流独立于分配它的参与者运行。如果流是长时间运行的,甚至是无限的,这将变得非常重要,并且我们希望 Actor 管理流的生命周期,这样当 Actor 停止时,流就会终止。扩展上面的例子,我将使流无限并使用 KillSwitch 来管理流的生命周期。
推荐阅读
- email - Jenkins - 如何在 FxCop 报告中发送包含警告和问题的电子邮件
- python - 遍历 pandas 系列
- r - 对相似列求和
- python - 针对 2 测试 Python 方法
- javascript - JavaScript 表单验证自定义消息和最小字段长度
- javascript - Angularjs - 如何使用解析器重定向到另一个状态
- c# - C# Sqlite '约束失败'
- apache-spark - 从镶木地板中删除数据会导致它的大小*增长* - 为什么?
- ios - 有没有办法在不降低返回图像质量的情况下获得 UIImage 的镜像(水平/垂直)?
- javascript - Vue 自定义指令