首页 > 解决方案 > 如何检测 Akka Stream 中特定持续时间的间隙?

问题描述

我不认为它目前(Akka Stream 2.5.21)可以并且对简单的解决方法感兴趣,或者看到它的人可能是 Akka Stream 库本身的一部分。

我目前的解决方法是:

/*
* Implement a flow that does the 'action' when there is a gap of minimum 'duration' in the stream.
*/
def onGap[T](duration: FiniteDuration, action: => Unit): Flow[T,T,NotUsed] = {
  Flow[T]
    .map(Some(_))
    .keepAlive(duration, () => { action; None })
    .collect{ case Some(x) => x }
}

我想看到的是类似于.keepAlive,但每个间隙只点燃一次,而不是向流中注入一个条目。

我考虑的其他方法:

我的用例是分析(猜测) Kinesis 流的消耗已达到“当前状态”(已看到历史值)。

会有更好的方法吗?

标签: akka-stream

解决方案


推荐阅读