scala - 将 ZipWith Outlet 连接到另一个 ZipWith 作为 Inlet
问题描述
我无法定义 a ZipWith out
as In
of another ZipWith
。它只从其中一个事件中获取单个事件,kafka topics
然后其余的流程不起作用,并且它没有得到任何其他事件。SinglezipWith
为 2 工作kafka
datasources
。每当我将pmf
和连接pm1
和pm2's
出口引入到入口时pmf
,它都不起作用:(你能帮忙吗?
数据是从 中检索的Kafka topics
,它与datasources
Source(1 到 100) 类似。Kafka datasource
有不同的行为我所有的测试都是用 dummy 完成的datasources
。
package sample
import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()
private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)
private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)
private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)
val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}
def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}
def printss(s:String): Unit ={
print(s)
}
val sinkFinal = Sink.foreach(printss)
val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)
s3 ~> pm2.in(0)
s4 ~> pm2.in(1)
pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)
pmf.out ~> sink.in
ClosedShape
})
val max: Future[Done] = g.run()
def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}
解决方案
推荐阅读
- python - 我的正则表达式错过了我要匹配的项目之一。有什么我想念的吗?
- selenium - 如何扫描页面以查找输入类型元素并一个一个地单击所有这些元素?
- unity3d - 制作像 Marble Blast Gold 中的相机一样的相机
- django - 使用 Django 管理应用程序并禁用会话中间件?
- php - 本地开发环境中 Laravel Sanctum 的 Cookie 问题
- c++ - 有没有更简洁的方法来编写模板函数定义?
- credit-card - emv APDU 响应 6581. 锁定 VISA 卡?
- python - 用于嵌套列表中的列表的 if-in 语句测试无法识别第一个列表在函数中使用时位于第二个列表中
- silverstripe - Silverstripe 3.7 站点搜索 - 查询失败:错误:语法错误
- github - 将对象作为变量传递给 Github 操作