scala - 在将 mongo 集合作为流迭代时处理错误
问题描述
我有一个简单的方法,它使用 akka 流遍历 mongo 集合,并且对于每个元素,我调用一个enrichDataFromGoogleAndInsert
调用 google api 的方法来丰富文档数据并将其插入到新集合中,enrichDataFromGoogleAndInsert
异步也是如此。
def processVendors()(implicit m: Materializer): Future[Done] = {
val vendorsSource: Source[Vendor, Future[State]] =
collection.find(json())
.noCursorTimeout
.cursor[Vendor]()
.documentSource()
.throttle(50, 1.second)
vendorsSource
.runForeach(vendor =>
enrichDataFromGoogleAndInsert(vendor)
)
}
我从控制器运行此方法,我想知道如何累积错误,并确保脚本在enrichDataFromGoogleAndInsert
抛出某种错误时停止。
解决方案
def processVendors()(implicit m: Materializer): Future[Done] = {
val vendorsSource: Source[Vendor, Future[State]] =
collection.find(json())
.noCursorTimeout
.cursor[Vendor]()
.documentSource()
vendorsSource
.mapAsync(50)(vendor =>
enrichDataFromGoogleAndInsert(vendor)
)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.ignore)
}
推荐阅读
- android - 断开辅助显示后 Android Studio 不显示
- selenium - 如何验证特定元素是否在 Selenide 中不可点击
- python - 如何从函数计算值而不返回它们或将它们设置为全局(对于 numba.cuda)?
- dart - forEach 没有预期的过滤
- python - 以 CBC 模式加密时出现错误 3 Python AES 错误
- c# - OnActionExecuting 和 beginexecute 方法有什么区别?
- java - mock-maker-inline 使测试失败,并在非最终非静态类上“传递给 Mockito.mockingDetails() 的 NotAMockException 参数应该是一个模拟”
- sql - SSRS 报告表达式
- xml - 在 Powershell 中从 XML 中获取数据 - Xpath 用法
- javascript - ajax 发布在 chrome 上工作,但在 firefox 和 safari 上不工作