sockets - Alpakka UDP:如何通过已经绑定的套接字响应接收到的数据报?
问题描述
我正在使用 Alpakkas UDP.bindFlow将传入的 UDP 数据报转发到 Kafka 代理。发送这些数据报的旧应用程序需要来自消息发送到的同一端口的 UDP 响应。我正在努力为这种行为建模,因为它需要我将流的输出连接到它的输入。
我尝试了这个解决方案,但它不起作用,因为响应数据报是从不同的源端口发送的:
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.alpakka.udp.Datagram
import akka.stream.alpakka.udp.scaladsl.Udp
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
object UdpInput extends App {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
val socket = new InetSocketAddress("0.0.0.0", 40000)
val udpBindFlow = Udp.bindFlow(socket)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
val kafkaSink = Flow[Datagram].map(toProducerRecord).to(Producer.plainSink(producerSettings))
def toProducerRecord(datagram: Datagram) = new ProducerRecord[String, String]("udp", datagram.data.utf8String)
def toResponseDatagram(datagram: Datagram) = Datagram(ByteString("OK"), datagram.remote)
// Does not model the behaviour I'm looking for because
// the response datagram is sent from a different source port
Source.asSubscriber
.via(udpBindFlow)
.alsoTo(kafkaSink)
.map(toResponseDatagram)
.to(Udp.sendSink)
.run
}
解决方案
我最终使用 GraphDSL 来实现循环图。感谢dvim为我指明了正确的方向!
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.alpakka.udp.Datagram
import akka.stream.alpakka.udp.scaladsl.Udp
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, MergePreferred, RunnableGraph, Source}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.util.ByteString
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
object UdpInput extends App {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
val socket = new InetSocketAddress("0.0.0.0", 40000)
val udpBindFlow = Udp.bindFlow(socket)
val udpResponseFlow = Flow[Datagram].map(toResponseDatagram)
val kafkaSink = Flow[Datagram].map(toProducerRecord).to(Producer.plainSink(producerSettings))
def toProducerRecord(datagram: Datagram) = new ProducerRecord[String, String]("udp", datagram.data.utf8String)
def toResponseDatagram(datagram: Datagram) = Datagram(ByteString("OK"), datagram.remote)
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
val merge = b.add(MergePreferred[Datagram](1))
val bcast = b.add(Broadcast[Datagram](2))
Source.asSubscriber ~> merge ~> udpBindFlow ~> bcast ~> kafkaSink
merge.preferred <~ udpResponseFlow <~ bcast
ClosedShape
}).run
}
推荐阅读
- git - 如何在分支之间切换时链接到 git repo 的内容
- python - 将一个字典的值与另一字典的键匹配(Python)
- html - 照片库过滤器在第一次加载时看起来损坏但在我刷新时看起来很好?
- docker - Hsaura docker-compose 文件返回错误 - 以代码 0 退出
- angular - 在 angular 中编写角度配置
- graphql - 我怎样才能 spyOn `useMutation` 以便我可以看到传递的值是什么?
- nginx - 通过 nginx-rtmp 流式传输树莓派相机
- node.js - 暂停转换流时会发生什么?
- python - 尝试将下拉选择从 HTML 传递到 Flask
- javascript - 错误:未捕获的类型错误:无法读取 null 的属性“getContext”