scala - 如何在 Flow 中使用确认语义?
问题描述
我想设计一个流程,当ActorRef
收到一些确认消息时,它只会从上游拉。
我的用例是使用一些静态集群参与者来完成工作。我有一个监护人,负责维护是否有可用的工人。当演员变得可用时,它将发布一条消息。我希望我的 Flow 能够知道此消息以拉动作业并将其推送到下游。否则会反压。
阅读文档时,GraphStage
API 似乎没有一些反应性组件,依赖InHandler
s 进行自定义处理:
setHandler(in, new InHandler {
override def onPush(): Unit = {
println(grab(in))
pull(in)
}
})
我可能会在此方法中轮询可用演员数量的更新,但这不是反应性的。
Akka 是否默认支持 Flows 中的确认?
解决方案
推不拉
Actor
一旦 an空闲,您可以Source
改为发送数据,而不是 让流信号需求上游。
您可以创建一个将具体化为ActorRef
将通知免费 Actor 的 Source:
object ActorIsFreeMessage
val source : Source[ActorIsFreeMessage, ActorRef] = Source.actorRef(???, ???)
然后,您可以在 Source 收到消息后附加一个Flow
将“拉作业”的内容:
type Job = ???
val pullAJob : () => Job = ???
val jobFlow : Flow[ActorIsFreeMessage, Job] =
Flow[ActorIsFreeMessage].map[Job](_ => pullAJob())
val jobSource : Source[Job, ActorRef] = source via jobFlow
一旦这个新的 Source 被附加到 Sink 上,其他 Actors 就可以向物化对象发送消息ActorRef
:
val jobSink : Sink[Job, ActorRef] = ???
val streamRef = jobSource.to(jobSink).run()
//inside of the Actor
streamRef ! ActorIsFreeMessage
推荐阅读
- mysql - io.vertx.mysqlclient.MySQLPool.query ("").execute 从未真正执行过并且什么也不返回
- c++ - 使用具有 C++20 概念的继承类从模板类继承
- javascript - 使用 MongoExport 从 MongoDB 导出数据并查询
- javascript - 以具有相同类的一个元素元素为目标
- xml - 经典 JScript 的正确文件嵌入
- windows-store-apps - 我必须做什么才能通过 Microsoft Store 应用程序提交过程的“商店列表”部分?
- java - 如何循环询问文件名,同时仍包含 FileNotFoundException 消息?
- spring - Spring Boot 未设置 cookie
- java - Tableview 仅显示添加到 observableList() 的最后一个输入
- javascript - Bootstrap 3 选项卡在本地工作但不在远程服务器上