scala - 在流中发送 actorRefWithAck
问题描述
我正在使用这个线程的答案,因为我需要特别对待第一个元素。问题是,我需要将此数据发送到另一个 Actor 或在本地保存(这是不可能的)。
所以,我的流看起来像这样:
val flow: Flow[Message, Message, (Future[Done], Promise[Option[Message]])] = Flow.fromSinkAndSourceMat(
Flow[Message].mapAsync[Trade](1) {
case TextMessage.Strict(text) =>
Unmarshal(text).to[Trade]
case streamed: TextMessage.Streamed =>
streamed.textStream.runFold("")(_ ++ _).flatMap(Unmarshal(_).to[Trade])
}.groupBy(pairs.size, _.s).prefixAndTail(1).flatMapConcat {
case (head, tail) =>
// sending first element here
val result = Source(head).to(Sink.actorRefWithAck(
ref = actor,
onInitMessage = Init,
ackMessage = Ack,
onCompleteMessage = "done"
)).run()
// some kind of operation on the result
Source(head).concat(tail)
}.mergeSubstreams.toMat(sink)(Keep.right),
Source.maybe[Message])(Keep.both)
这是一个好习惯吗?会不会产生意想不到的后果?不幸的是,我无法调用persist
内部流,所以我想将此数据发送到外部系统。
解决方案
您当前的方法不会result
以任何方式使用,因此更简单的替代方法是触发并忘记第一个Message
演员:
groupBy(pairs.size, _.s).prefixAndTail(1).flatMapConcat {
case (head, tail) =>
// sending first element here
actor ! head.head
Source(head).concat(tail)
}
然后,参与者不必担心处理Init
和发送Ack
消息,并且可以只关心持久化Message
实例。
推荐阅读
- python - 如何在 pyinstaller 中只包含需要的模块?
- c# - 如何在 C# 中将字节数组读取为多维数组?
- python - 使用 Python/正则表达式优化字符串
- tensorflow - SegNet:为什么训练准确率高而验证准确率低?
- javascript - SlicePipe 角函数
- node.js - hyperledger fabric sdk node orderer, node client在deadline前连接失败
- java - 无法解析到JSP编译中的类型手动部署在tomcat中
- reactjs - AWS Amplify react Connect 组件中的多个订阅
- java - 从Java中的文本文件中读取记录
- redux - `fields` 在 `ReduxForm(
)`,但其值为`undefined`