首页 > 解决方案 > 将 ZipWith Outlet 连接到另一个 ZipWith 作为 Inlet

问题描述

我无法定义 a ZipWith outas Inof another ZipWith。它只从其中一个事件中获取单个事件,kafka topics然后其余的流程不起作用,并且它没有得到任何其他事件。SinglezipWith为 2 工作kafka datasources。每当我将pmf和连接pm1pm2's出口引入到入口时pmf,它都不起作用:(你能帮忙吗?

数据是从 中检索的Kafka topics,它与datasourcesSource(1 到 100) 类似。Kafka datasource有不同的行为我所有的测试都是用 dummy 完成的datasources

package sample

import akka.{Done}  
import akka.actor.ActorSystem  
import akka.kafka.{ConsumerSettings, Subscriptions}  
import akka.kafka.scaladsl.Consumer  
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}  
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}  
import org.apache.kafka.clients.consumer.ConsumerConfig  
import org.apache.kafka.common.serialization.StringDeserializer  

import scala.concurrent.Future

object KafkaApp extends App {
  implicit val system = ActorSystem("StreamBuilder2323")
  implicit val materializer = ActorMaterializer()

  private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
  private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)


  private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)

  private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)


  val concat = GraphDSL.create() { implicit b ⇒
    val zip = b.add(ZipWith[String,String,String](concatFunc _))
    UniformFanInShape(zip.out, zip.in0, zip.in1)
  }

  def concatFunc(s1:String, s2:String): String ={
    s1 + " _ " + s2
  }

  def printss(s:String): Unit ={
    print(s)
  }


  val sinkFinal = Sink.foreach(printss)

  val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒     sink ⇒
    import GraphDSL.Implicits._
    val pm1 = b.add(concat)
    val pm2 = b.add(concat)
    val pmf = b.add(concat)
    s1  ~> pm1.in(0)
    s2  ~> pm1.in(1)

    s3  ~> pm2.in(0)
    s4  ~> pm2.in(1)

    pm1.out ~> pmf.in(0)
    pm2.out ~> pmf.in(1)

    pmf.out ~> sink.in

    ClosedShape
  })

  val max: Future[Done] = g.run()

  def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("consumerGroup")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val subscription = Subscriptions.topics(topicName)
    val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
    streamSource
  }
}

标签: scalaapache-kafkaakkaakka-stream

解决方案


推荐阅读