首页 > 解决方案 > 如何在 Spark Scala 中运行批量配置单元查询

问题描述

我正在使用以下代码从我的 spark 作业循环执行多个配置单元查询

implicit val sparkSession = SparkSession
      .builder()
      .config(sparkConf)
      .enableHiveSupport()
      .getOrCreate()

 val bizEventFolders = fs.listStatus(outputPath)
    bizEventFolders.foreach(folder => {
      val filePath = folder.getPath().toString
      if(filePath.contains(outputDir+"biz_evnt_key=")){
        val bizEventKey = filePath.replaceAll(outputDir+"biz_evnt_key=","")
        val addPartitionHiveQuery = s"alter table $tableName add if not exists partition (year=$year, month=$month, day=$day, hr=$hour,biz_evnt_key=$bizEventKey) location '${outputDir}biz_evnt_key=$bizEventKey'"
        sparkSession.sql(addPartitionHiveQuery)
        logger.info(s"successfully ran add partition hive query $addPartitionHiveQuery")
      }
    })

问题是,我必须一个接一个地运行多个这样的查询来将所有分区添加到 HIVE 表中,我怎样才能一次提交所有查询来触发而不是一个一个地触发它们?

标签: scalaapache-sparkhiveapache-spark-sql

解决方案


您可以使用Scala Futures或其他并行 AP 并行运行这些查询。

一个简单的解决方案可能是使用par

bizEventFolders.par.foreach(folder => {
      val filePath = folder.getPath().toString
      if(filePath.contains(outputDir+"biz_evnt_key=")){
        val bizEventKey = filePath.replaceAll(outputDir+"biz_evnt_key=","")
        val addPartitionHiveQuery = s"alter table $tableName add if not exists partition (year=$year, month=$month, day=$day, hr=$hour,biz_evnt_key=$bizEventKey) location '${outputDir}biz_evnt_key=$bizEventKey'"
        sparkSession.sql(addPartitionHiveQuery)
        logger.info(s"successfully ran add partition hive query $addPartitionHiveQuery")
      }
    })

通过在顺序集合(例如列表)上调用该par方法,它成为一个并行集合,并且可以以与顺序集合相同的方式使用它。


推荐阅读