首页 > 解决方案 > 如何使用 Akka 流中的值来实例化 GooglePubSub 流?

问题描述

我正在尝试创建一个Flow用于Source队列的。我希望这可以与 Alpakka Google PubSub 连接器一起使用:https ://doc.akka.io/docs/alpakka/current/google-cloud-pub-sub.html

为了使用这个连接器,我需要创建一个Flow取决于提供为 a 的主题名称String,如上面的链接和代码片段中所示。

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
  GooglePubSub.publish(topic, config)

问题

我希望能够设置一个Source队列来接收发布消息所需的主题和消息。我首先创建出必要PublishRequest的消息String。然后我想通过Flow运行实例化来运行它GooglePubSub.publish(topic, config)。但是,我不知道如何将主题带到流程的那一部分。

val gcFlow: Flow[(String, String), PublishRequest, NotUsed] = Flow[(String, String)]
  .map(messageData => {
    PublishRequest(Seq(
      PubSubMessage(new String(Base64.getEncoder.encode(messageData._1.getBytes))))
      )
    })
  .via(GooglePubSub.publish(topic, config))

val bufferSize = 10
val elementsToProcess = 5

// newSource is a Source[PublishRequest, NotUsed]
val (queue, newSource) = Source
  .queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
  .via(gcFlow)
  .preMaterialize()

我不确定是否有办法让主题进入队列而不是初始数据流的一部分。而且我不知道如何将流值转换为动态Flow.

如果我不恰当地使用了某些术语,请记住我是新手。

标签: scalaakkaakka-streamalpakka

解决方案


您可以通过使用flatMapConcat并在其中生成一个新的来实现Source它:

// using tuple assuming (Topic, Message)
val gcFlow: Flow[(String, String), (String, PublishRequest), NotUsed] = Flow[(String, String)]
    .map(messageData => {
      val pr = PublishRequest(immutable.Seq(
        PubSubMessage(new String(Base64.getEncoder.encode(messageData._2.getBytes)))))
      // output flow shape of (String, PublishRequest)
      (messageData._1, pr)
    })

val publishFlow: Flow[(String, PublishRequest), Seq[String], NotUsed] =
Flow[(String, PublishRequest)].flatMapConcat {
    case (topic: String, pr: PublishRequest) =>
      // Create a Source[PublishRequest]
      Source.single(pr).via(GooglePubSub.publish(topic, config))
  }

// wire it up
val (queue, newSource) = Source
    .queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
    .via(gcFlow)
    .via(publishFlow)
    .preMaterialize()

或者,您可以用案例类替换元组以更好地记录

case class Something(topic: String, payload: PublishRequest)

// output flow shape of Something[String, PublishRequest]
Something(messageData._1, pr)

Flow[Something[String, PublishRequest]].flatMapConcat { s =>
  Source.single(s.payload)... // etc
}

解释:

在我们输出通过gcFlow的元组的 FlowShape 。输入是元组,我们生成新的流过(String, PublishRequest)publishFlow(String, PublishRequest)flatMapConcatSource[PublishRequest]GooglePubSub.publish

为每个项目创建新的源会有轻微的开销。这不应该对性能产生可衡量的影响


推荐阅读