scala - 如何参数化将数据帧写入配置单元表
问题描述
我在 RBDMS 中有一个表列表(跨不同类别),我想将其提取并保存在 hive 中,并且我想以能够将类别名称附加到 hive 中的输出位置的方式进行参数化。例如,我有一个类别“员工”,我希望能够以“hive_db.employee_some_other_random_name”格式保存从 RDBMS 提取的表
我有如下代码
val category = "employee"
val tableList = List("schema.table_1", "schema.table_2", "schema.table_3")
val tableMap = Map("schema.table_1" -> "table_1",
"schema.table_2" -> "table_2",
"schema.table_3" -> "table_3")
val queryMap = Map("table_1" -> (select * from table_1) tble,
"table_2" -> (select * from table_2) tble,
"table_3" -> (select * from table_3) tble)
val tableBucketMap = Map("table_1" -> "bucketBy(80,\"EMPLOY_ID\",\"EMPLOYE_ST\").sortBy(\"EMPLOY_ST\").format(\"parquet\")",
"table_2" -> "bucketBy(80, \"EMPLOY_ID\").sortBy(\"EMPLOY_ID\").format(\"parquet\")",
"table_3" -> "bucketBy(80, \"EMPLOY_ID\", \"SAL_ID\", \"DEPTS_ID\").sortBy(\"EMPLOY_ID\").format(\"parquet\")")
for (table <- tableList){
val tableName = tableMap(table)
val print_start = "STARTING THE EXTRACTION PROCESSING FOR TABLE: %s"
val print_statement = print_start.format(tableName)
println(print_statement)
val extract_query = queryMap(table)
val query_statement_non = "Query to extract table %s is: "
val query_statement = query_statement_non.format(tableName)
println(query_statement + extract_query)
val extracted_table = spark.read.format("jdbc")
.option("url", jdbcURL)
.option("driver", driver_type)
.option("dbtable", extract_query)
.option("user", username)
.option("password", password)
.option("fetchsize", "20000")
.option("queryTimeout", "0")
.load()
extracted_table.show(5, false)
//saving extracted table in hive
val tableBucket = tableBucketMap(table)
val output_loc = "hive_db.%s_table_extracted_for_%s"
val hive_location = output_loc.format(category, tableName)
println(hive_location)
val saving_table = "%s.write.%s.saveAsTable(\"%s\")"
saving_table.format(extracted_table, tableBucket, hive_location)
println(saving_table.format(extracted_table, tableBucket, hive_location))
val print_end = "COMPLETED EXTRACTION PROCESS FOR TABLE: %s"
val print_end_statement = print_end.format(tableName)
println(print_end_statement)
我有下面第一张表的结果。相同的结果适用于其他表..
STARTING THE EXTRACTION PROCESSING FOR TABLE: table_1
Query to extract table table_1 is: (select * from table_1) tble
+---------+--------------------+
|EMPLOY_ID|EMPLOYE_NM |
+---------+--------------------+
|1 |WELLINGTON |
|2 |SMITH |
|3 |CURLEY |
|4 |PENDRAGON |
|5 |KEESLER |
+---------+--------------------+
only showing top 5 rows
hive_db.employee_table_extracted_for_table_1
[EMPLOY_ID: int, EMPLOYE_NM: string].write.bucketBy(80, "EMPLOY_ID", "EMPLOYE_NO").sortBy("EMPLOY_ID").format("parquet").saveAsTable("hive_db.employee_table_extracted_for_table_1")
COMPLETED EXTRACTION PROCESS FOR TABLE: table_1
它没有将提取的数据框写入配置单元,而是打印列名
[EMPLOY_ID: int, EMPLOYE_NM: String].write............saveAsTable("hive_db.employee_table_extracted_for_table_1")
如何将 DF 写入配置单元表?
解决方案
您可以尝试这种方法吗,像这样更改您的存储桶图(我已经为 t1 做过,请对 t2 和 t3 做同样的事情),
val tableBucketMap = Map("table_1" -> "80,\"employe_st\"")
并df.bucketBy()
用足够的论据替换(numBuckets: Int, colName: String, colNames: String*)
val stringArr=tableBucket.split(",")
val numBuckets=stringArr(0).toInt
val colName=stringArr(1)
extracted_table.write.mode("append").bucketBy(numBuckets,colName).format("parquet").saveAsTable(hive_location)
这种方法将解决上述问题
[EMPLOY_ID: int, EMPLOYE_NM: String].write............saveAsTable("hive_db.employee_table_extracted_for_table_1")
推荐阅读
- json - 将特定实体发送到 FIWARE QuantumLeap
- javascript - discord.js v12 向第一个频道发送消息
- mysql - 当列和表被定义时,列是未知的?
- php - 如何打印 SimpleXMLElement 对象的特定数组元素
- oracle - 表中的分层首选项
- python - pymongo - “OperationFailure:管道阶段规范对象必须包含一个字段”
- python - C# 返回“C:\Windows\Sysnative”作为无效目录,但 python 没有
- typescript - 在 Typescript 中控制类型级别的“评估”
- lua - 使用 fiber.yield() 和 fiber.testcancel() 的 Tarantool 纤维行为
- python - Python Pandas 丢弃列耗尽资源 - 被杀死