scala - Fs2处理管道之间的错误?
问题描述
我正在尝试编写一个程序来抓取文件,进行更改,然后将它们保存在其他地方。
所以我的程序看起来像这样:
def processFiles: Stream[F, Unit] =
getFilePaths.flatMap(path =>
downloadFile(path)
.through(scanForViruses)
.through(encrypt)
.through(saveElsewhere)
)
def downloadFile(path: Path): Stream[F, Byte]
def scanForViruses: Pipe[F, Byte, Byte]
def encrypt: Pipe[F, Byte, Byte]
def saveElsewhere: Pipe[F, Byte, Unit]
其中一些步骤可能会失败,并且使用上面的代码,单个失败的文件会阻止其他文件的处理。所以我需要添加一些处理:
getFilePaths.flatMap(path =>
downloadFile(path)
.handleErrorsWith(e => log.error("failed at download", e) >> Stream.empty)
.through(scanForViruses)
.handleErrorsWith(e => log.error("failed at scan", e) >> Stream.empty)
.through(encrypt)
.handleErrorsWith(e => log.error("failed at encrypt", e) >> Stream.empty)
.through(saveElsewhere)
.handleErrorsWith(e => log.error("failed at save", e) >> Stream.empty)
)
这会正确捕获错误,但异常之前流中的所有元素仍会通过
例如,如果第一个文件流如下所示: Stream(Byte1, Byte2, Exception) 那么 handleErrorWith 将返回 Stream(Byte1, Byte2) 这对于文件没有意义。
另一方面,如果我rethrow
是异常(或不理会它),我会得到我想要的整体行为:它使该特定文件的流失败。但是每个错误处理程序都会在异常上被调用,我没有关于它实际失败的信息:
getFilePaths.flatMap(path =>
downloadFile(path).attempt.evalMap(handler1).rethrow
.through(scanForViruses).attempt.evalMap(handler2).rethrow
.through(encrypt).attempt.evalMap(handler3).rethrow
.through(saveElsewhere).attempt.evalMap(handler4)
)
^^ 如果发生异常,上面将调用所有处理程序downloadFile
有没有办法在第一个异常时使整个流失败,并跟踪失败的位置,而不必每次执行步骤时都编译流?
解决方案
看来我的用例不是流的预期目的。
换句话说,我原来的程序的结构是关闭的。
我正在用无标签 final 编写,我用作消除器的函数被编写为Pipe[F, Byte, Unit]
,但我想要的功能需要更接近的东西Stream[F, Byte] => F[Unit]
,然后我将只使用内置的错误处理F
推荐阅读
- swift - 在初始应用午餐时在 init 中显示工作表
- minio - 有没有办法通过基于 sql 的语句来处理 Minio 文件?
- javascript - 捕获除解决/拒绝承诺以外的 Http 错误
- python - 如何在 Python 的井字游戏中实现极小极大函数?
- css - 如何用 fontawesome 替换默认的 woocomerce star raiting 图标?
- github - 如何分配自定义名称以在 GitHub 上发布工件
- javascript - 如何检查 MessageEmbed 中的关键字?
- ssl - 无法删除 Google 管理的 SSL 证书
- python - Python3 在使用 gspread 时重试坚韧(不带装饰器)会出现错误,声称“缺少参数”
- python - 从特定用户列表中获取实时推文...获取重复的 tweepy python