scala - 如何在scala中访问和合并Future类型的多个DataFrame
问题描述
我有火花斯卡拉应用程序。我正在尝试在其中使用 Futures 来并行化少数独立的操作集。我在 Futures 中调用它们并且它们返回给我 Future 类型的 DataFrame 我如何在最后合并它们并在任何 Future 无法计算时抛出错误。下面是我的代码。当我尝试在 onComplete 块中应用 Dataframe 的联合时,它说这个错误
value union is not a member of scala.concurrent.Future[(scala.concurrent.Future[org.apache.spark.sql.DataFrame], scala.concurrent.Future[org.apache.spark.sql.DataFrame], scala.concurrent.Future[org.apache.spark.sql.DataFrame])]..
任何帮助将不胜感激谢谢
val future_session = Future{ ProcessSession(df, spark) }
val future_links = Future{ ProcessSession(df, spark) }
val future_nodes = Future { ProcessSession(df, spark) }
val result = for {
r1 <- future_session
r2 <- future_links
r3 <- future_nodes
} yield (
r1,r2,r3
)
result.onComplete {
case Success(x) => {
log.info("Execution completed")
}
case Failure(e) => e.printStackTrace
}
解决方案
看起来它ProcessSession.apply
本身会导致 aFuture[DataFrame]
取决于在你最终得到 a 之前完成了多少工作Future
,你会想要:
如果工作量很大(因此
ProcessSession.apply
并行调用对于在执行上下文中创建承诺和调度任务的额外开销很有用),您可以使用.flatten
a 上的方法Future[Future[T]]
来删除一层“未来性”:for { r1 <- future_session.flatten r2 <- future_links.flatten r3 <- future_nodes.flatten } // and so forth
如果
ProcessSession.apply
在返回 a 之前没有做太多Future
,那么只需Future
用原始ProcessSession
调用替换块:val future_session = ProcessSession(df, spark) val future_links = ProcessSession(df, spark) // Not sure what you really wanted here, but I'm going with what was in the code you posted val future_nodes = ProcessSession(df, spark)
推荐阅读
- javascript - 将远程文件流式传输到“fs.createReadStream”
- wpf - 如何在停靠面板中正确对齐和拉伸文本?
- powershell - 尝试在 PowerShell 更新后在线连接到 PnP 时出错
- python - 将上下文附加到变量但不影响其有效值的方法
- python - 安装 django-observer 包的问题
- python - 将 async-for 与 if 条件结合以中断中间等待的正确方法是什么?
- typescript - Firebase 功能部署未更新
- google-cloud-data-fusion - 错误:遇到错误字符 (ASCII 0)。在数据融合
- graphics - 什么是 vulkan 颜色空间?
- django - 仅将视图添加到 Django 以进行测试(不在生产中)