akka-stream - 如何检测 Akka Stream 中特定持续时间的间隙?
问题描述
我不认为它目前(Akka Stream 2.5.21)可以并且对简单的解决方法感兴趣,或者看到它的人可能是 Akka Stream 库本身的一部分。
我目前的解决方法是:
/*
* Implement a flow that does the 'action' when there is a gap of minimum 'duration' in the stream.
*/
def onGap[T](duration: FiniteDuration, action: => Unit): Flow[T,T,NotUsed] = {
Flow[T]
.map(Some(_))
.keepAlive(duration, () => { action; None })
.collect{ case Some(x) => x }
}
我想看到的是类似于.keepAlive
,但每个间隙只点燃一次,而不是向流中注入一个条目。
我考虑的其他方法:
.idleTimeout(duration)
withSupervision.Decider
,但这需要单独ActorSystem
创建。GraphStage
但那些总是感觉复杂..
我的用例是分析(猜测) Kinesis 流的消耗已达到“当前状态”(已看到历史值)。
会有更好的方法吗?
解决方案
推荐阅读
- python - 多处理:不能在文件之间共享变量
- server - Godaddy 服务器上的数据在最新数据和 1 年前的数据之间不断波动。请解决
- python - Python jsonschema 没有引发异常
- python - Python:Telnet Libray,等待提示
- javascript - firebase 8.4 重复运行 .once()
- html - 孩子不在父 div 内
- java - 在 Spring Rest Controller 中执行的方法比在纯 java 项目中执行的方法要慢得多
- html - 如何让列表中的水平线跨越到 HTML 中的行尾?
- eslint - 如何在没有“定义前使用”错误的情况下使用“setInterval”?
- android - 当应用程序被杀死时无法获取 FCM 通知android