python - 如何以编程方式生成基于数据框的创建表语句
问题描述
我正在尝试具有基于data
avro 数据部分创建 Hive 表的功能。源数据的架构如下所示。目标表需要按partition
源数据中的字段进行分区,有两列name
和description
。我可以通过 获取源data
部分df.select('data.*')
和通过 获取表架构df.select('data.*').schema
,但partition
列不在其中。我的目标是有一个 create table 子句create table mytable (name string, description string) partitioned by (partition integer) store as parquet
我该怎么做?我需要先附加df.select('partition.*')
到 df.select('data.*') 吗?非常感谢您的帮助。
编辑:目标是您不需要指定列的级别,如 data.name 和分区,而只需传入“列”和“分区列”(可以在任何嵌套级别,然后生成创建表语句。
root
|--metadata: struct
| |---id: string
| |---time : string
|--data:struct
| |---name : string
| |---description : string
|--partition:integer
解决方案
以下独立示例向您展示了如何创建和编写您指定的表。您需要提供自己的path_for_saving
.
import pyspark.sql.functions as F
import pyspark.sql.types as T
schema = T.StructType([
T.StructField('metadata', T.StructType([
T.StructField("id",T.StringType()),
T.StructField("time",T.StringType())])),
T.StructField('data', T.StructType([
T.StructField("name",T.StringType()),
T.StructField("description",T.StringType()),
])),
T.StructField("partition", T.IntegerType()),
T.StructField("Level1", T.StructType([
T.StructField("Level2",T.StructType([
T.StructField("Level3", T.StructType([
T.StructField("partition_alt", T.IntegerType())]))]))]))
])
df_sample_data = spark.createDataFrame([(("id1", "time1"), ("name1", "desc1"), 1, (((3,),),)), (("id2", "time2"), ("name2", "desc2"), 2, (((4,),),)) ], schema)
df_sample_data.printSchema()
df_sample_data.show()
def parse_fields(schema, path=""):
collect = []
for struct_field in schema:
this_field_name = struct_field.name
if type(struct_field.dataType) == T.StructType:
collect = collect + parse_fields(struct_field.dataType, path + this_field_name + ".")
else:
collect = collect + [path + this_field_name]
return collect
parsed_fields = parse_fields(schema) # Find all leaf fields in the schema and return as '.' seperated path
print("Parsed fields:" + str(parsed_fields))
def get_column(col_name):
for field in parsed_fields:
if col_name in field:
return F.col(field).alias(col_name)
name_col = "name"
description_col = "description"
partition_col = "partition_alt"
df_mytable = df_sample_data.select(get_column(name_col), get_column(description_col), get_column(partition_col))
df_mytable.show()
df_mytable.write.partitionBy(partition_col).format("parquet").save(path_for_saving)
输出:
root
|-- metadata: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- time: string (nullable = true)
|-- data: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- description: string (nullable = true)
|-- partition: integer (nullable = true)
|-- Level1: struct (nullable = true)
| |-- Level2: struct (nullable = true)
| | |-- Level3: struct (nullable = true)
| | | |-- partition_alt: integer (nullable = true)
+------------+--------------+---------+-------+
| metadata| data|partition| Level1|
+------------+--------------+---------+-------+
|{id1, time1}|{name1, desc1}| 1|{{{3}}}|
|{id2, time2}|{name2, desc2}| 2|{{{4}}}|
+------------+--------------+---------+-------+
Parsed fields:['metadata.id', 'metadata.time', 'data.name', 'data.description', 'partition', 'Level1.Level2.Level3.partition_alt']
+-----+-----------+-------------+
| name|description|partition_alt|
+-----+-----------+-------------+
|name1| desc1| 3|
|name2| desc2| 4|
+-----+-----------+-------------+
该示例演示了如何查找深度嵌套的字段。您需要get_column
使用自己的标准重写以将字段名称与完整的列名称匹配。在这里,get_column
只返回第一个包含名称的字段col_name
。
推荐阅读
- php - TCPDF 和 base64 图像
- java - 当我尝试在 android studio 上运行构建时,我不断收到此错误
- mysql - 从两个级别查找儿童用户
- swift - 列表选择为 Set
- 如何使用? - python-3.x - 如何在熊猫中使用多个过滤器
- mysql - 如何通过在 mysql 中加载数据来解决此错误?
- javascript - 在焦点上添加柔和的光晕到输入
- firebase - 如何使用 Cloud Firestore 对聊天应用程序的聊天频道进行排序
- javascript - 在 Raspberry Pi4 上运行电子时出现问题。它在 Raspberry Pi3 上运行良好
- api - Google Sheets API Node.js 追加覆盖