首页 > 解决方案 > Akka Streams:初始化和处理接收器资源的最佳实践

问题描述

我有一个场景,我想将图表的结果写入 CSV。这包括文件的创建,文件编写器的初始化(我正在使用这个库),最后,在流完成后,我想再次处理/关闭编写器。

理想情况下,我想将此逻辑封装在接收器中,但我想知道添加初始化和处置逻辑的最佳实践/挂钩。

标签: akka-stream

解决方案


给定任何类型的资源,而不仅仅是一个文件,它消耗数据元素并且也需要关闭

type Data = ???

trait DataConsumer extends Function1[Data, Unit] with AutoCloseable

可以使用可以预先添加的方法创建一个Sink在完成后关闭消费者的方法:watchTerminationFlow

def createDataConsumerSink(dataConsumer: DataConsumer) : Sink[Data,_] = 
  Flow[Data].watchTermination()( (_, f) => f foreach (_ => dataConsumer.close()))
            .to(Sink.foreach[Data](dataConsumer.apply))

推荐阅读