scala - 使用 spark sql 从列表中插入数据到配置单元表中
问题描述
我有一个文件名、文件路径和文件大小的列表,我想使用 spark SQL 将这些详细信息插入我的配置单元表中。
var fs1 = FileSystem.get(sparksession.sparkContext.hadoopConfiguration)
var file_path = fs1.listStatus(new Path("path")).filter(_.isFile).map(_.getPath).toList
var new_files = fs1.listStatus(new Path("path")).filter(_.isFile).map(_.getPath.getName).toList
var file_size = fs1.listStatus(new Path("path")).filter(_.isFile).map(_.getLen).toList
var file_data = file_path zip new_files zip file_size
for ((filedetail, size) <- file_size){
var filepath = filedetail._1
var filesize: Long = size
var filename = filedetail._2
var df = spark.sql(s"insert into mytable(file_path,filename,file_size) select '${file_path}' as file_path,'${new_files}' as filename,'${file_size}' as file_size")
df.write.insertInto("dbname.tablename")
}
它正在生成这个插入查询:
insert into mytable(file_path,filename,file_size) select 'List(path/filename.txt,path/filename4.txt,path/filename5.txt)' as file_path,'List(filename.txt, filename4.txt, filename5.txt)' as filename,'List(19, 19, 19)' as file_size;
我收到一个错误:
不匹配的输入 'file_path' 期望 {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(第 1 行,第 34 行)
我想以表格格式插入数据
file_path filename file_size
path/filename.txt filename.txt 19
path/filename4.txt filename4.txt 19
path/filename5.txt filename5.txt 19
有人可以建议我如何插入上面的数据吗?
有没有办法再次将此查询拆分为 3 个不同的插入配置单元语句。
insert into mytable(file_path,filename,file_size) select 'path/filename.txt' as file_path,'filename.txt' as filename,'19' as file_size;
insert into mytable(file_path,filename,file_size) select 'path/filename3.txt' as file_path,'filename3.txt' as filename,'19' as file_size;
insert into mytable(file_path,filename,file_size) select 'path/filename4.txt' as file_path,'filename4.txt' as filename,'19' as file_size;
解决方案
您可以通过多种方式做到这一点。
首先,您可以将列表转换为RDD
val rdd1 = sc.parallelize(fs1.listStatus(new Path("path")).filter(_.isFile).map(_.getPath).toList)
// then you can convert the rdd into a dataframe
import spark.implicits._
val df1 = rdd1.map((value1, value2, ....) => (value1, value2,....)).toDF("col1", "col2", ....)
// from the dataframe you can create a temporary view
df1.createOrReplaceTempView("my_table")
// then you can load the temporary view in your table
sqlContext.sql("""
INSERT [INTO | OVERWRITE] my_hive_table SELECT * FROM my_table
""")
推荐阅读
- swift - 防止在swift 5中粘贴到文本字段中
- c# - 如何为 sl5 目标框架构建 SDK 风格的项目?
- android - 如何在 react-native 的函数内部组件中使用变量?
- python - GluonCV ImportError:无法导入名称“is_np_array”
- python-3.x - 如果在单元格 A1 中找到字符串,则在 B1 单元格中替换 python pandas
- scala - Spark(scala):如何将 UDF 转换应用于嵌套数据中的数组列
- reactjs - React 类组件中的“Timer”类型上不存在 this.setinterval
- javascript - javascript中 Math.floor() 的第二个参数有什么用?
- javascript - 如何通过 shopify 页面视图触发 zap?
- ionic-framework - 希望将我的 ionic 4 项目与 pouchdb 连接起来。但不断收到错误“无法访问词法声明”