首页 > 解决方案 > 如何在scala中访问和合并Future类型的多个DataFrame

问题描述

我有火花斯卡拉应用程序。我正在尝试在其中使用 Futures 来并行化少数独立的操作集。我在 Futures 中调用它们并且它们返回给我 Future 类型的 DataFrame 我如何在最后合并它们并在任何 Future 无法计算时抛出错误。下面是我的代码。当我尝试在 onComplete 块中应用 Dataframe 的联合时,它说这个错误

value union is not a member of scala.concurrent.Future[(scala.concurrent.Future[org.apache.spark.sql.DataFrame], scala.concurrent.Future[org.apache.spark.sql.DataFrame], scala.concurrent.Future[org.apache.spark.sql.DataFrame])]..

任何帮助将不胜感激谢谢

val future_session = Future{ ProcessSession(df, spark) }
val future_links =  Future{ ProcessSession(df, spark) }
val future_nodes = Future { ProcessSession(df, spark) }

val result = for {
      r1 <- future_session
      r2 <- future_links
      r3 <- future_nodes
} yield ( 
   r1,r2,r3
)

result.onComplete {          
    case Success(x) => {
      log.info("Execution completed")       
    }
    case Failure(e) => e.printStackTrace
}

标签: scalaapache-sparkparallel-processingapache-spark-sqlspark-streaming

解决方案


看起来它ProcessSession.apply本身会导致 aFuture[DataFrame] 取决于在你最终得到 a 之前完成了多少工作Future,你会想要:

  • 如果工作量很大(因此ProcessSession.apply并行调用对于在执行上下文中创建承诺和调度任务的额外开销很有用),您可以使用.flattena 上的方法Future[Future[T]]来删除一层“未来性”:

    for {
      r1 <- future_session.flatten
      r2 <- future_links.flatten
      r3 <- future_nodes.flatten
    } // and so forth
    
  • 如果ProcessSession.apply在返回 a 之前没有做太多Future,那么只需Future用原始ProcessSession调用替换块:

    val future_session = ProcessSession(df, spark)
    val future_links = ProcessSession(df, spark) // Not sure what you really wanted here, but I'm going with what was in the code you posted
    val future_nodes = ProcessSession(df, spark)
    

推荐阅读