java - 回放实时收集的数据以模拟真实的交通延迟和消息排序
问题描述
有两个输入流,都产生定义为的对象实例
case class ReplayData(timestamp:Long, payload:Any)
流 1
1、有效载荷1
1000,有效载荷3
流 2
400,有效载荷2
1500,有效载荷4
我想实现重播机制,它将按我在每个元素上的时间戳排序的元素向下游推送
它将模拟生产中的实时场景。
这种机制需要遵守消息之间的延迟,例如第一条消息发送是有效载荷1(它的起点),来自Stream2的有效载荷2应该在400毫秒后发送(下一条消息时间戳和初始消息时间戳之间的差异)等等。
我可以使用DelayedQueue很容易地做到这一点,在这个SO 线程中解释了用法
延迟元素的无界阻塞队列,其中一个元素只有在其延迟到期时才能被取出。
队列的头部是延迟在过去过期最远的元素。如果没有延迟过期,则没有头,并且 poll 将返回 null。
当元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回的值小于或等于零时,就会过期。即使无法使用 take 或 poll 删除未过期的元素,它们也会被视为普通元素。
例如,size 方法返回过期和未过期元素的计数。此队列不允许空元素。不允许空元素。
我试图弄清楚如何在 Akka 流中做到这一点,但是很难找到可以为我解决这个问题的东西。
我将mergeSorted视为一种合并两个流并根据某些函数对它们进行排序的方法。
它似乎或多或少符合基于某些自定义功能进行排序的目的。
我不确定如何根据时间戳属性处理元素之间的延迟。
使用普通的旧 AKKA,我可以使用调度程序来读取数据,对它们进行排序并安排每个元素在时间过去时发送。
解决方案
我不记得 akka-streams 中的任何内容可能会延迟开箱即用的消息,并为每条消息自定义延迟。毕竟 akka-streams 背后的想法是反应式编程。我只知道 2 个选项如何解决您的问题(假设您已经合并了 2 个来源)
Flow.mapAsync - 在这种情况下,延迟一段时间后返回 a 完全是你的事
Future
。例如:import java.time.LocalDateTime import java.util.concurrent.Executors import akka.NotUsed import akka.actor.ActorSystem import akka.pattern.after import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} object Application extends App { implicit val sys: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer() case class SomeEntity(time: Int, value: Int) val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400)).map(i => SomeEntity(i, i * i + 3)) val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) val scheduler = sys.scheduler val f = source .mapAsync(10)(se => after(se.time.milliseconds, scheduler)(Future.successful(se))(ec)) .runForeach(se => println(s"${LocalDateTime.now()} -> $se")) f.onComplete(_ => sys.terminate()) }
您的用例(毕竟是模拟)可能实际上并不那么严格,因此您可能会使用Flow.throttle。它不像第一种解决方案那么简单和精确,但它的性能要好得多,因为它使用了一些轻量级的桶模型来控制项目输出率。
import java.time.LocalDateTime import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ object Application extends App { implicit val sys: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer() case class SomeEntity(time: Int, value: Int) val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400, 1400, 1500, 1900, 2500, 2700)).map(i => SomeEntity(i, i * i + 3)) val future = source.throttle(cost = 1, per = 1.millisecond, _.time).runForeach(se => { println(s"${LocalDateTime.now()} -> $se") }) future.onComplete(_ => sys.terminate()) }
推荐阅读
- javascript - 如何为多次使用相同钩子的屏幕创建测试?反应测试库
- javascript - 错误:用于查看 PDF 的角度项目中的对象可能为“空”
- ssl - 看不到来自 Charles 的移动应用请求
- c# - 将 JSON 后模型解包为操作参数
- python - 如何为“国家”键解决此 KeyError?
- loops - 将后跟与模式匹配的行打印到文件 A,如果不匹配则打印到文件 B
- javascript - Svelte:如何通知子组件或兄弟组件
- python - Python递归函数不返回
- xpages - 如何在 Xpages 中为 xp:inputText 重置/清除 xp:typeAhead?
- java - .xsd 架构导入我在本地有文件的命名空间,但无法从外部获取。如何实施?