首页 > 解决方案 > 在将 mongo 集合作为流迭代时处理错误

问题描述

我有一个简单的方法,它使用 akka 流遍历 mongo 集合,并且对于每个元素,我调用一个enrichDataFromGoogleAndInsert调用 google api 的方法来丰富文档数据并将其插入到新集合中,enrichDataFromGoogleAndInsert异步也是如此。

   def processVendors()(implicit m: Materializer): Future[Done] = {
    val vendorsSource: Source[Vendor, Future[State]] =
      collection.find(json())
      .noCursorTimeout
      .cursor[Vendor]()
      .documentSource()
      .throttle(50, 1.second)

    vendorsSource
    .runForeach(vendor => 
      enrichDataFromGoogleAndInsert(vendor)
    )
  }

我从控制器运行此方法,我想知道如何累积错误,并确保脚本在enrichDataFromGoogleAndInsert抛出某种错误时停止。

标签: scalaakka-stream

解决方案


def processVendors()(implicit m: Materializer): Future[Done] = {
val vendorsSource: Source[Vendor, Future[State]] =
  collection.find(json())
    .noCursorTimeout
    .cursor[Vendor]()
    .documentSource()


vendorsSource
  .mapAsync(50)(vendor =>
    enrichDataFromGoogleAndInsert(vendor)
  )
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runWith(Sink.ignore)
}

推荐阅读