scala - 如何使用 Akka 流中的值来实例化 GooglePubSub 流?
问题描述
我正在尝试创建一个Flow
用于Source
队列的。我希望这可以与 Alpakka Google PubSub 连接器一起使用:https ://doc.akka.io/docs/alpakka/current/google-cloud-pub-sub.html
为了使用这个连接器,我需要创建一个Flow
取决于提供为 a 的主题名称String
,如上面的链接和代码片段中所示。
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(topic, config)
问题
我希望能够设置一个Source
队列来接收发布消息所需的主题和消息。我首先创建出必要PublishRequest
的消息String
。然后我想通过Flow
运行实例化来运行它GooglePubSub.publish(topic, config)
。但是,我不知道如何将主题带到流程的那一部分。
val gcFlow: Flow[(String, String), PublishRequest, NotUsed] = Flow[(String, String)]
.map(messageData => {
PublishRequest(Seq(
PubSubMessage(new String(Base64.getEncoder.encode(messageData._1.getBytes))))
)
})
.via(GooglePubSub.publish(topic, config))
val bufferSize = 10
val elementsToProcess = 5
// newSource is a Source[PublishRequest, NotUsed]
val (queue, newSource) = Source
.queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
.via(gcFlow)
.preMaterialize()
我不确定是否有办法让主题进入队列而不是初始数据流的一部分。而且我不知道如何将流值转换为动态Flow
.
如果我不恰当地使用了某些术语,请记住我是新手。
解决方案
您可以通过使用flatMapConcat
并在其中生成一个新的来实现Source
它:
// using tuple assuming (Topic, Message)
val gcFlow: Flow[(String, String), (String, PublishRequest), NotUsed] = Flow[(String, String)]
.map(messageData => {
val pr = PublishRequest(immutable.Seq(
PubSubMessage(new String(Base64.getEncoder.encode(messageData._2.getBytes)))))
// output flow shape of (String, PublishRequest)
(messageData._1, pr)
})
val publishFlow: Flow[(String, PublishRequest), Seq[String], NotUsed] =
Flow[(String, PublishRequest)].flatMapConcat {
case (topic: String, pr: PublishRequest) =>
// Create a Source[PublishRequest]
Source.single(pr).via(GooglePubSub.publish(topic, config))
}
// wire it up
val (queue, newSource) = Source
.queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
.via(gcFlow)
.via(publishFlow)
.preMaterialize()
或者,您可以用案例类替换元组以更好地记录它
case class Something(topic: String, payload: PublishRequest)
// output flow shape of Something[String, PublishRequest]
Something(messageData._1, pr)
Flow[Something[String, PublishRequest]].flatMapConcat { s =>
Source.single(s.payload)... // etc
}
解释:
在我们输出通过gcFlow
的元组的 FlowShape 。输入是元组,我们生成新的流过(String, PublishRequest)
publishFlow
(String, PublishRequest)
flatMapConcat
Source[PublishRequest]
GooglePubSub.publish
为每个项目创建新的源会有轻微的开销。这不应该对性能产生可衡量的影响
推荐阅读
- javascript - 如何使用 href 链接获得第二个选项(我有 2 个)
- http - 在 Flutter 中添加证书
- r - R:仅在数据框中包含工作日的观察结果以及所有日期的观察结果
- javascript - 提交表单后重定向到新页面,显示内容并下载文件
- javascript - 在 Array of Arrays 中找到属性名称并更改(删除)它时,您将如何搜索?
- sql - 为什么将表达式转换为数据类型 int 的算术溢出错误会自行解决?
- python - 如何在 OR 列表正则表达式中找到尽可能少的捕获组
- sql - 获取不同的值sql server 2016
- sql - 获取同一列的当前数据和上个月数据
- r - 将 (+) 1 添加到数据框中有 0 的每一行