首页 > 解决方案 > Databricks - 测试 SQL 创建的表存在正确的列

问题描述

我正在尝试在 Databricks 中创建一个测试,以检查是否已使用正确的列正确创建了一组表。这感觉好像它应该很简单,但我不能完全掌握解决方案,一切都是从 Oracle 迁移的,我的背景是 Oracle 和 SQL,而不是 python。

例如,想象以下将填充仪表板数据的示例表。如果它已经以不同的结构存在,则报告脚本将失败。

%sql

CREATE TABLE IF NOT EXISTS $report_schema.p23a_com 
(AGE_GROUP                                STRING,
 UniqServReqID                            STRING,
 Rec_Code                                 STRING,
 Elapsed_days                             INT)
USING delta 
PARTITIONED BY (AGE_GROUP)

部分测试如下,但显然由于分区列信息,断言失败。我似乎无法让 DESCRIBE 不那么冗长,我可以从输入列表中删除 # 但这看起来很混乱,并且在我扩展测试以获取数据类型时变得更加困难。有没有更好的方法来捕获模式?

def get_table_schema(dbase,table_name):

    desc_query="DESCRIBE "+dbase+"."+table_name
    df_tab_cols = sqlContext.sql(desc_query)
    return df_tab_cols

def test_table_schema(tab_cols,list_tab_cols):

    input_col_list = df_tab_cols.select("col_name").rdd.map(lambda row : row[0]).collect()
    assert set(input_col_list) == set(list_tab_cols)

db = report_schema
table = "p23a_com"
cols = ["AGE_GROUP","UniqServReqID","Rec_Code","Elapsed_days"]

df_tab_cols = get_table_schema(db,table)

test_table_schema(df_tab_cols,cols)


标签: unit-testingdatabricks

解决方案


答案是我有点厚,而且过于注重 SQL。我需要做的不是使用 SQL 描述,而是通过 spark 将表直接读入数据帧,然后使用列。

IE

def get_table_schema(dbase,table_name):

    desc_query = dbase+"."+table_name
    df_tab_cols = spark.table(desc_query)
    return df_tab_cols

def test_table_schema(tab_cols,list_tab_cols):

    input_col_list = list(df_tab_cols.columns) 
    assert set(input_col_list) == set(list_tab_cols)

    print(input_col_list)

推荐阅读