首页 > 解决方案 > 回放实时收集的数据以模拟真实的交通延迟和消息排序

问题描述

有两个输入流,都产生定义为的对象实例

case class ReplayData(timestamp:Long, payload:Any)

流 1

1、有效载荷1

1000,有效载荷3

流 2

400,有效载荷2

1500,有效载荷4

我想实现重播机制,它将按我在​​每个元素上的时间戳排序的元素向下游推送

它将模拟生产中的实时场景。

这种机制需要遵守消息之间的延迟,例如第一条消息发送是有效载荷1(它的起点),来自Stream2的有效载荷2应该在400毫秒后发送(下一条消息时间戳和初始消息时间戳之间的差异)等等。

我可以使用DelayedQueue很容易地做到这一点,在这个SO 线程中解释了用法

延迟元素的无界阻塞队列,其中一个元素只有在其延迟到期时才能被取出。

队列的头部是延迟在过去过期最远的元素。如果没有延迟过期,则没有头,并且 poll 将返回 null。

当元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回的值小于或等于零时,就会过期。即使无法使用 take 或 poll 删除未过期的元素,它们也会被视为普通元素。

例如,size 方法返回过期和未过期元素的计数。此队列不允许空元素。不允许空元素。

我试图弄清楚如何在 Akka 流中做到这一点,但是很难找到可以为我解决这个问题的东西。

我将mergeSorted视为一种合并两个流并根据某些函数对它们进行排序的方法。

它似乎或多或少符合基于某些自定义功能进行排序的目的。

我不确定如何根据时间戳属性处理元素之间的延迟

使用普通的旧 AKKA,我可以使用调度程序来读取数据,对它们进行排序并安排每个元素在时间过去时发送。

标签: javascalaakkaakka-stream

解决方案


我不记得 akka-streams 中的任何内容可能会延迟开箱即用的消息,并为每条消息自定义延迟。毕竟 akka-streams 背后的想法是反应式编程。我只知道 2 个选项如何解决您的问题(假设您已经合并了 2 个来源)

  1. Flow.mapAsync - 在这种情况下,延迟一段时间后返回 a 完全是你的事Future。例如:

    import java.time.LocalDateTime
    import java.util.concurrent.Executors
    
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.pattern.after
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.Source
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.concurrent.{ExecutionContext, Future}
    
    object Application extends App {
    
      implicit val sys: ActorSystem = ActorSystem()
      implicit val mat: ActorMaterializer = ActorMaterializer()
    
      case class SomeEntity(time: Int, value: Int)
      val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400)).map(i => SomeEntity(i, i * i + 3))
    
      val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
      val scheduler = sys.scheduler
    
      val f = source
        .mapAsync(10)(se => after(se.time.milliseconds, scheduler)(Future.successful(se))(ec))
        .runForeach(se => println(s"${LocalDateTime.now()} -> $se"))
    
      f.onComplete(_ => sys.terminate())
    }
    
  2. 您的用例(毕竟是模拟)可能实际上并不那么严格,因此您可能会使用Flow.throttle。它不像第一种解决方案那么简单和精确,但它的性能要好得多,因为它使用了一些轻量级的桶模型来控制项目输出率。

    import java.time.LocalDateTime
    
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.Source
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    
    object Application extends App {
    
      implicit val sys: ActorSystem = ActorSystem()
      implicit val mat: ActorMaterializer = ActorMaterializer()
    
      case class SomeEntity(time: Int, value: Int)
    
      val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400, 1400, 1500, 1900, 2500, 2700)).map(i => SomeEntity(i, i * i + 3))
    
    
      val future = source.throttle(cost = 1, per = 1.millisecond, _.time).runForeach(se => {
        println(s"${LocalDateTime.now()} -> $se")
      })
    
      future.onComplete(_ => sys.terminate())
    }
    

推荐阅读