akka-stream - Akka Streams:初始化和处理接收器资源的最佳实践
问题描述
我有一个场景,我想将图表的结果写入 CSV。这包括文件的创建,文件编写器的初始化(我正在使用这个库),最后,在流完成后,我想再次处理/关闭编写器。
理想情况下,我想将此逻辑封装在接收器中,但我想知道添加初始化和处置逻辑的最佳实践/挂钩。
解决方案
给定任何类型的资源,而不仅仅是一个文件,它消耗数据元素并且也需要关闭:
type Data = ???
trait DataConsumer extends Function1[Data, Unit] with AutoCloseable
可以使用可以预先添加的方法创建一个Sink
在完成后关闭消费者的方法:watchTermination
Flow
def createDataConsumerSink(dataConsumer: DataConsumer) : Sink[Data,_] =
Flow[Data].watchTermination()( (_, f) => f foreach (_ => dataConsumer.close()))
.to(Sink.foreach[Data](dataConsumer.apply))
推荐阅读
- javascript - 如何在两个顶点之间划分边?
- r - reghelper 包:模型包含分类协变量时的 R graph_model 错误
- regex - 正则表达式拆分多行 - 按 ID 分组
- java - 使用 Jackson 将 Java 集序列化为 JavaScript 对象
- reactjs - React withRouter location prop 有时有键
- typeerror - 如何解决 rllib 中的“TypeError: can't pickle _thread.lock objects”
- linux - shell如何执行正常程序?
- c# - C# 从另一个表单在组合框中添加新值
- lstm - 如何使用 LSTM 文本分类进行预测?
- node.js - 如何将我的 koa 路由拆分为单独的文件?中间件问题