首页 > 解决方案 > 如何在 Flow 中使用确认语义?

问题描述

我想设计一个流程,当ActorRef收到一些确认消息时,它只会从上游拉。

我的用例是使用一些静态集群参与者来完成工作。我有一个监护人,负责维护是否有可用的工人。当演员变得可用时,它将发布一条消息。我希望我的 Flow 能够知道此消息以拉动作业并将其推送到下游。否则会反压。

阅读文档时,GraphStageAPI 似乎没有一些反应性组件,依赖InHandlers 进行自定义处理:

setHandler(in, new InHandler {
  override def onPush(): Unit = {
    println(grab(in))
    pull(in)
  }
})

我可能会在此方法中轮询可用演员数量的更新,但这不是反应性的。

Akka 是否默认支持 Flows 中的确认?

标签: scalaakkaakka-stream

解决方案


推不拉

Actor一旦 an空闲,您可以Source改为发送数据,而不是 让流信号需求上游。

您可以创建一个将具体化为ActorRef将通知免费 Actor 的 Source:

object ActorIsFreeMessage

val source : Source[ActorIsFreeMessage, ActorRef] = Source.actorRef(???, ???)

然后,您可以在 Source 收到消息后附加一个Flow将“拉作业”的内容:

type Job = ???

val pullAJob : () => Job = ???

val jobFlow : Flow[ActorIsFreeMessage, Job] = 
  Flow[ActorIsFreeMessage].map[Job](_ => pullAJob())

val jobSource : Source[Job, ActorRef] = source via jobFlow

一旦这个新的 Source 被附加到 Sink 上,其他 Actors 就可以向物化对象发送消息ActorRef

val jobSink : Sink[Job, ActorRef] = ???

val streamRef = jobSource.to(jobSink).run()

//inside of the Actor
streamRef ! ActorIsFreeMessage

推荐阅读