scala - 超时后停止 fs2-stream
问题描述
我想使用类似于take(n: Int)
但在时间维度上的函数:
consume(period: Duration
. 因此,如果发生超时,我希望流终止。我知道我可以将流编译成类似的东西IO[List[T]]
并取消它,但是我会丢失结果。实际上,我想将无尽的流转换为有限的流并保留结果。
更多关于更广泛的问题。我有来自消息传递代理的无穷无尽的事件流,但我也有轮换凭据来连接到代理。所以我想要的是消耗事件流一段时间,然后停止,获取新凭据,再次连接到创建新流的代理并将两个流连接为一个。
解决方案
有一种方法可以做到这一点:
/**
* Interrupts this stream after the specified duration has passed.
*/
def interruptAfter[F2[x] >: F[x]: Concurrent: Timer](duration: FiniteDuration): Stream[F2, O]
推荐阅读
- android-studio - 当我重新启动并加载旧版本的应用程序时,我的 Flutter 应用程序没有更新
- html - 使用溢出的侧面导航定位粘性
- javascript - 随机消息发送未按预期工作 discord.js
- javascript - 使用 React Redux 的私有路由
- php - 在 PHP 中访问 POST 提交函数之外的变量
- javascript - d3.v4:如何使用 linkRadial 而不是 linkVertical/Horizontal
- flutter - How to add variables in a class programmatically in dart?
- android - 如何使用 Retrofit/Moshi 解析嵌套的 JSON 对象
- sql - 是否可以锁定 SQL Server 中列的值?
- java-8 - 重构 Java 8 流代码 - 代码重复