首页 > 解决方案 > 如何以编程方式生成基于数据框的创建表语句

问题描述

我正在尝试具有基于dataavro 数据部分创建 Hive 表的功能。源数据的架构如下所示。目标表需要按partition源数据中的字段进行分区,有两列namedescription。我可以通过 获取源data部分df.select('data.*')和通过 获取表架构df.select('data.*').schema,但partition列不在其中。我的目标是有一个 create table 子句create table mytable (name string, description string) partitioned by (partition integer) store as parquet我该怎么做?我需要先附加df.select('partition.*')到 df.select('data.*') 吗?非常感谢您的帮助。

编辑:目标是您不需要指定列的级别,如 data.name 和分区,而只需传入“列”和“分区列”(可以在任何嵌套级别,然后生成创建表语句。

root
|--metadata: struct
|   |---id: string
|   |---time : string
|--data:struct
|   |---name : string
|   |---description : string
|--partition:integer

标签: pythonjsonpysparkavro

解决方案


以下独立示例向您展示了如何创建和编写您指定的表。您需要提供自己的path_for_saving.

import pyspark.sql.functions as F
import pyspark.sql.types as T

schema = T.StructType([
          T.StructField('metadata', T.StructType([
            T.StructField("id",T.StringType()),
            T.StructField("time",T.StringType())])),
          T.StructField('data', T.StructType([
            T.StructField("name",T.StringType()),
            T.StructField("description",T.StringType()),
          ])),
          T.StructField("partition", T.IntegerType()),
          T.StructField("Level1", T.StructType([
            T.StructField("Level2",T.StructType([
              T.StructField("Level3", T.StructType([
                T.StructField("partition_alt", T.IntegerType())]))]))]))
         ])
df_sample_data = spark.createDataFrame([(("id1", "time1"), ("name1", "desc1"), 1, (((3,),),)), (("id2", "time2"), ("name2", "desc2"), 2, (((4,),),)) ], schema)
df_sample_data.printSchema()
df_sample_data.show()

def parse_fields(schema, path=""):
  collect = []
  for struct_field in schema:
    this_field_name = struct_field.name
    if type(struct_field.dataType) == T.StructType:
      collect = collect + parse_fields(struct_field.dataType, path + this_field_name + ".")
    else: 
      collect = collect + [path + this_field_name]
  return collect

parsed_fields = parse_fields(schema) # Find all leaf fields in the schema and return as '.' seperated path
print("Parsed fields:" + str(parsed_fields))
def get_column(col_name):
  for field in parsed_fields:
    if col_name in field:
      return F.col(field).alias(col_name)
    
name_col = "name"
description_col = "description"
partition_col = "partition_alt"

df_mytable = df_sample_data.select(get_column(name_col), get_column(description_col), get_column(partition_col))
df_mytable.show()

df_mytable.write.partitionBy(partition_col).format("parquet").save(path_for_saving)

输出:

root
 |-- metadata: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- time: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- description: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- Level1: struct (nullable = true)
 |    |-- Level2: struct (nullable = true)
 |    |    |-- Level3: struct (nullable = true)
 |    |    |    |-- partition_alt: integer (nullable = true)

+------------+--------------+---------+-------+
|    metadata|          data|partition| Level1|
+------------+--------------+---------+-------+
|{id1, time1}|{name1, desc1}|        1|{{{3}}}|
|{id2, time2}|{name2, desc2}|        2|{{{4}}}|
+------------+--------------+---------+-------+

Parsed fields:['metadata.id', 'metadata.time', 'data.name', 'data.description', 'partition', 'Level1.Level2.Level3.partition_alt']
+-----+-----------+-------------+
| name|description|partition_alt|
+-----+-----------+-------------+
|name1|      desc1|            3|
|name2|      desc2|            4|
+-----+-----------+-------------+

该示例演示了如何查找深度嵌套的字段。您需要get_column使用自己的标准重写以将字段名称与完整的列名称匹配。在这里,get_column只返回第一个包含名称的字段col_name


推荐阅读