首页 > 解决方案 > Fs2处理管道之间的错误?

问题描述

我正在尝试编写一个程序来抓取文件,进行更改,然后将它们保存在其他地方。

所以我的程序看起来像这样:

def processFiles: Stream[F, Unit] =
  getFilePaths.flatMap(path =>
    downloadFile(path)
      .through(scanForViruses)
      .through(encrypt)
      .through(saveElsewhere)
  )

def downloadFile(path: Path): Stream[F, Byte]

def scanForViruses: Pipe[F, Byte, Byte]

def encrypt: Pipe[F, Byte, Byte]

def saveElsewhere: Pipe[F, Byte, Unit]

其中一些步骤可能会失败,并且使用上面的代码,单个失败的文件会阻止其他文件的处理。所以我需要添加一些处理:

getFilePaths.flatMap(path =>
    downloadFile(path)
      .handleErrorsWith(e => log.error("failed at download", e) >> Stream.empty)
      .through(scanForViruses)
      .handleErrorsWith(e => log.error("failed at scan", e) >> Stream.empty)      
      .through(encrypt)
      .handleErrorsWith(e => log.error("failed at encrypt", e) >> Stream.empty)   
      .through(saveElsewhere)
      .handleErrorsWith(e => log.error("failed at save", e) >> Stream.empty)
  )

这会正确捕获错误,但异常之前流中的所有元素仍会通过

例如,如果第一个文件流如下所示: Stream(Byte1, Byte2, Exception) 那么 handleErrorWith 将返回 Stream(Byte1, Byte2) 这对于文件没有意义。

另一方面,如果我rethrow是异常(或不理会它),我会得到我想要的整体行为:它使该特定文件的流失败。但是每个错误处理程序都会在异常上被调用,我没有关于它实际失败的信息:

getFilePaths.flatMap(path =>
    downloadFile(path).attempt.evalMap(handler1).rethrow
      .through(scanForViruses).attempt.evalMap(handler2).rethrow      
      .through(encrypt).attempt.evalMap(handler3).rethrow      
      .through(saveElsewhere).attempt.evalMap(handler4)
  )

^^ 如果发生异常,上面将调用所有处理程序downloadFile

有没有办法在第一个异常时使整个流失败,并跟踪失败的位置,而不必每次执行步骤时都编译流?

标签: scalafs2

解决方案


看来我的用例不是流的预期目的。

换句话说,我原来的程序的结构是关闭的。

我正在用无标签 final 编写,我用作消除器的函数被编写为Pipe[F, Byte, Unit],但我想要的功能需要更接近的东西Stream[F, Byte] => F[Unit],然后我将只使用内置的错误处理F


推荐阅读