apache-spark - 如何在 Spark 工作时高效地等待外部任务完成?
问题描述
我有以下问题:我们用 spark 读取了一个大的 CSV 文件。我们需要转换文件的每一行并将其写回另一个文本文件。在此转换过程中,我们需要通过 REST 调用外部服务,并且在写入文件之前,我们需要从服务中获取答案。同时,我们可以在 Spark 上做其他的动作。
一个简单的实现应该是这样的:
val keys = spark.read.csv("/path/to/myfile.csv")
.map(row => {
val result = new Param(row(0), row(3), row(7))
result
})
.collect()
val enrichedData: Map[String,String] = keys.map(key =>
externalService.getValue(key)) // unpredictable response time
val finalResult = spark.read.csv("/path/to/myfile.csv") // read same file twice
.map(row => doSomeTransformation(row))
.map(row => doSomeMoreTransformation(row))
.map(row => andAnotherCostlyOperation(row))
.map(row => {
val key = row(0)
val myData = enrichedData(key) // only here we use the data from ext.serv.
enrichRow(row, myData)
})
.write.csv("/path/to/output.csv")
这里的问题是我两次读取相同的输入文件。首先,我需要从文件中收集所有密钥并将它们发送到外部服务。一旦外部服务响应,我可以再次检查同一个文件并使用我获得的数据来完成数据处理。
如何更有效地做到这一点?直到最后一步,我才真正需要来自外部服务的数据,所以如果 Spark 可以并行执行其他转换,然后当它到达最后一个 map() 函数时,它会等待(可能0 秒)用于来自外部服务的数据。
解决方案
推荐阅读
- google-bigquery - BigQuery 云功能
- cocoa - 双击透明 NSWindow 标题不会最大化窗口
- c# - 一台服务器上的 WCF 错误,但另一台服务器上没有:套接字连接已中止
- mongodb - 远程转储 mongodb 集合
- powershell - PowerShell Get-Acl - 获取成员而不是组
- java - 左对齐 Google Map Android 上的“我的位置”按钮
- selenium - Xpath 根据特定单元格中包含的文本从表格单元格中选择按钮
- powershell - 将文件夹安全性链接到 AD 安全组 Powershell 脚本
- javascript - Rails:尝试在javascript中使用ruby数组时使用“null”而不是具有3个元素的数组
- r - 在 tmap 中绘制山体阴影