scala - 如何在 Spark Scala 中运行批量配置单元查询
问题描述
我正在使用以下代码从我的 spark 作业循环执行多个配置单元查询
implicit val sparkSession = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val bizEventFolders = fs.listStatus(outputPath)
bizEventFolders.foreach(folder => {
val filePath = folder.getPath().toString
if(filePath.contains(outputDir+"biz_evnt_key=")){
val bizEventKey = filePath.replaceAll(outputDir+"biz_evnt_key=","")
val addPartitionHiveQuery = s"alter table $tableName add if not exists partition (year=$year, month=$month, day=$day, hr=$hour,biz_evnt_key=$bizEventKey) location '${outputDir}biz_evnt_key=$bizEventKey'"
sparkSession.sql(addPartitionHiveQuery)
logger.info(s"successfully ran add partition hive query $addPartitionHiveQuery")
}
})
问题是,我必须一个接一个地运行多个这样的查询来将所有分区添加到 HIVE 表中,我怎样才能一次提交所有查询来触发而不是一个一个地触发它们?
解决方案
您可以使用Scala Futures或其他并行 AP 并行运行这些查询。
一个简单的解决方案可能是使用par:
bizEventFolders.par.foreach(folder => {
val filePath = folder.getPath().toString
if(filePath.contains(outputDir+"biz_evnt_key=")){
val bizEventKey = filePath.replaceAll(outputDir+"biz_evnt_key=","")
val addPartitionHiveQuery = s"alter table $tableName add if not exists partition (year=$year, month=$month, day=$day, hr=$hour,biz_evnt_key=$bizEventKey) location '${outputDir}biz_evnt_key=$bizEventKey'"
sparkSession.sql(addPartitionHiveQuery)
logger.info(s"successfully ran add partition hive query $addPartitionHiveQuery")
}
})
通过在顺序集合(例如列表)上调用该par
方法,它成为一个并行集合,并且可以以与顺序集合相同的方式使用它。
推荐阅读
- javascript - Firebase 模拟器对本地 FireStore 的请求不成功
- reactjs - 什么是防止浏览器缓存文件反应的最佳解决方案
- git - 如何为 git pull 合并消息提供评论
- terraform - 用于检索键值的 Terraform 过滤器
- c++11 - 使用 C++11 模板生成算法的多个版本
- python - 无法在 Python 中使用 xpath 定位元素
- macos - Mac 上的 Kafka 在启动控制台使用者时没有对节点 -1 的正在进行的请求
- jquery - 背景位置 css 不使用变量
- struct - 如何将约束添加到结构的字段以进行实例化?
- c++ - Unity 2019.4+ 中的 Apple Mach-O 链接器 (ld) 错误