首页 > 解决方案 > 如何在加载 S3 时自动推断 CSV 文件的架构?

问题描述

语境

目前,我使用 Snowflake 作为数据仓库,使用 AWS 的 S3 作为数据湖。登陆 S3 的大多数文件都是Parquet格式。对于这些,我使用了 Snowflake 的一个新的受限功能(在此处记录),它可以自动检测 S3 上 parquet 文件中的模式,我可以使用它来生成CREATE TABLE具有正确列名和推断数据类型的语句。此功能目前仅适用于 Apache Parquet、Avro 和 ORC 文件。我想找到一种方法来实现相同的预期目标,但适用于 CSV 文件。

我试图做的

这就是我目前推断 Parquet 文件架构的方式:

select generate_column_description(array_agg(object_construct(*)), 'table') as columns 
from table (infer_schema(location=>'${LOCATION}', file_format=>'${FILE_FORMAT}'))

但是,如果我尝试将其指定FILE_FORMAT为 csv,则该方法将失败。

我考虑过的其他方法:

  1. 将 S3 上的所有文件传输到 parquet(这涉及更多代码和基础设施设置,因此不是我的首选,尤其是我想在 s3 上保留一些文件的自然类型)
  2. 有一个脚本(例如使用 Python 中的 Pandas 之类的库)来推断 S3 中文件的架构(这也涉及更多代码,并且从在 Snowflake 中处理 parquet 文件的意义上来说会很奇怪,但非 parquet 文件由aws上的一些脚本)。
  3. 使用 Snowflake UDF 推断架构。还没有完全考虑我的选择。

期望的行为

当一个新的 csv 文件登陆 S3(在预先存在的 STAGE 上)时,我想推断模式,并能够生成CREATE TABLE具有推断数据类型的语句。最好,我想在 Snowflake 中这样做,因为现有的上述模式推理解决方案在那里存在。如果需要,很高兴添加更多信息。

标签: amazon-s3snowflake-cloud-data-platform

解决方案


更新:我修改了在无类型(所有字符串类型列)表中推断数据类型的 SP,它现在直接针对 Snowflake 阶段。项目代码可在此处获得:https ://github.com/GregPavlik/InferSchema

我写了一个存储过程来帮助解决这个问题;但是,它的唯一目标是推断无类型列的数据类型。它的工作原理如下:

  1. 将 CSV 加载到表中,所有列都定义为 varchars。
  2. 使用针对新表的查询调用 SP(要点是仅获取所需的列并限制行数以保持类型推断时间合理)。
  3. 在 SP 调用中还有旧位置和新位置的数据库、模式和表——旧的带有所有 varchar,新的带有推断的类型。

然后 SP 将推断数据类型并创建两个 SQL 语句。一条语句将使用推断的数据类型创建新表。一条语句将使用适当的包装器(例如 try_multi_timestamp(),一个扩展 try_to_timestamp() 以尝试各种常见格式的 UDF)从无类型(全 varchar)表复制到新表。

我的意思是扩展它,以便它根本不需要无类型(所有 varchar)表,但还没有解决它。既然它出现在这里,我可能会回过头来用这种能力更新 SP。您可以指定直接从阶段读取的查询,但您必须使用 $1、$2... 和列名的别名(否则 DDL 将尝试创建像 $1 这样的列名)。如果查询直接针对某个阶段运行,对于旧的数据库、模式和表,您可以输入任何内容,因为这仅用于从 select 语句生成插入。

-- This shows how to use on the Snowflake TPCH sample, but could be any query.
-- Keep the row count down to reduce the time it take to infer the types.
call infer_data_types('select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM limit 10000', 
                      'SNOWFLAKE_SAMPLE_DATA', 'TPCH_SF1', 'LINEITEM',
                      'TEST', 'PUBLIC', 'LINEITEM');

create or replace procedure INFER_DATA_TYPES(SOURCE_QUERY string,
                                             DATABASE_OLD string,
                                             SCHEMA_OLD string,
                                             TABLE_OLD string,
                                             DATABASE_NEW string,
                                             SCHEMA_NEW string,
                                             TABLE_NEW string)
returns string
language javascript
as
$$

/****************************************************************************************************
*                                                                                                   *
*  DataType Classes                                        
*                                                                                                   *
****************************************************************************************************/

class Query{
    constructor(statement){
        this.statement = statement;
    }
}


class DataType {
    constructor(db, schema, table, column, sourceQuery) {
        this.db = db;
        this.schema = schema;
        this.table = table;
        this.sourceQuery = sourceQuery
        this.column = column;
        this.insert = '"@~COLUMN~@"';
        this.totalCount = 0;
        this.notNullCount = 0;
        this.typeCount = 0;
        this.blankCount = 0;
        this.minTypeOf  = 0.95;
        this.minNotNull = 1.00;
    }
    setSQL(sqlTemplate){
        this.sql = sqlTemplate;
        this.sql = this.sql.replace(/@~DB~@/g,     this.db);
        this.sql = this.sql.replace(/@~SCHEMA~@/g, this.schema);
        this.sql = this.sql.replace(/@~TABLE~@/g,  this.table);
        this.sql = this.sql.replace(/@~COLUMN~@/g, this.column);
    }
    getCounts(){
        var rs;
        rs = GetResultSet(this.sql);
        rs.next();
        this.totalCount   = rs.getColumnValue("TOTAL_COUNT");
        this.notNullCount = rs.getColumnValue("NON_NULL_COUNT");
        this.typeCount    = rs.getColumnValue("TO_TYPE_COUNT");
        this.blankCount   = rs.getColumnValue("BLANK");
    }
    isCorrectType(){
        return (this.typeCount / (this.notNullCount - this.blankCount) >= this.minTypeOf);
    }
    isNotNull(){
        return (this.notNullCount / this.totalCount >= this.minNotNull);
    }
}

class TimestampType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "timestamp";
        this.insert = 'try_multi_timestamp(trim("@~COLUMN~@"))';
        this.sourceQuery = SOURCE_QUERY;
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

class IntegerType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "number(38,0)";
        this.insert = 'try_to_number(trim("@~COLUMN~@"), 38, 0)';
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

class DoubleType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "double";
        this.insert = 'try_to_double(trim("@~COLUMN~@"))';
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

class BooleanType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "boolean";
        this.insert = 'try_to_boolean(trim("@~COLUMN~@"))';
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

 // Catch all is STRING data type
class StringType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "string";
        this.totalCount   = 1;
        this.notNullCount = 0;
        this.typeCount    = 1;
        this.minTypeOf    = 0;
        this.minNotNull   = 1;
    }
}

/****************************************************************************************************
*                                                                                                   *
*  Main function                                                                                    *
*                                                                                                   *
****************************************************************************************************/

var pass    = 0;
var column;
var typeOf;
var ins = '';

var newTableDDL = '';
var insertDML   = '';

var columnRS = GetResultSet(GetTableColumnsSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD));

while (columnRS.next()){
    pass++;
    if(pass > 1){
        newTableDDL += ",\n";
        insertDML   += ",\n";
    }
    column = columnRS.getColumnValue("COLUMN_NAME");
    typeOf = InferDataType(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD, column, SOURCE_QUERY);
    newTableDDL += '"' + typeOf.column + '" ' + typeOf.syntax;
    ins = typeOf.insert;
    insertDML   += ins.replace(/@~COLUMN~@/g, typeOf.column);
}

return GetOpeningComments()                                     +
       GetDDLPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW)     +
       newTableDDL                                              +
       GetDDLSuffixSQL()                                        +
       GetDividerSQL()                                          +
       GetInsertPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW)  +
       insertDML                                                +
       GetInsertSuffixSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD)  ;

/****************************************************************************************************
*                                                                                                   *
*  Helper functions                                                                                 *
*                                                                                                   *
****************************************************************************************************/

function InferDataType(db, schema, table, column, sourceQuery){

    var typeOf;

    typeOf = new IntegerType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new DoubleType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new BooleanType(db, schema, table, column, sourceQuery);        // May want to do a distinct and look for two values
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new TimestampType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new StringType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    return null;
}

/****************************************************************************************************
*                                                                                                   *
*  SQL Template Functions                                                                           *
*                                                                                                   *
****************************************************************************************************/

function GetCheckTypeSQL(insert, sourceQuery){

var sql = 
`
select      count(1)                              as TOTAL_COUNT,
            count("@~COLUMN~@")                   as NON_NULL_COUNT,
            count(${insert})                      as TO_TYPE_COUNT,
            sum(iff(trim("@~COLUMN~@")='', 1, 0)) as BLANK
--from        "@~DB~@"."@~SCHEMA~@"."@~TABLE~@";
from        (${sourceQuery})
`;

return sql;
}

function GetTableColumnsSQL(dbName, schemaName, tableName){

var sql = 
`
select  COLUMN_NAME 
from    ${dbName}.INFORMATION_SCHEMA.COLUMNS
where   TABLE_CATALOG = '${dbName}' and
        TABLE_SCHEMA  = '${schemaName}' and
        TABLE_NAME    = '${tableName}'
order by ORDINAL_POSITION;
`;
  
return sql;
}

function GetOpeningComments(){
return `
/**************************************************************************************************************
*                                                                                                             *
*   Copy and paste into a worksheet to create the typed table and insert into the new table from the old one. *
*                                                                                                             *
**************************************************************************************************************/
`;
}

function GetDDLPrefixSQL(db, schema, table){

var sql =
`
create or replace table "${db}"."${schema}"."${table}"
(
`;

    return sql;
}

function GetDDLSuffixSQL(){
    return "\n);";
}

function GetDividerSQL(){
return `
/**************************************************************************************************************
*                                                                                                             *
*   The SQL statement below this attempts to copy all rows from the string tabe to the typed table.           *
*                                                                                                             *
**************************************************************************************************************/
`;
}

function GetInsertPrefixSQL(db, schema, table){
var sql =
`\ninsert into "${db}"."${schema}"."${table}" select\n`;
return sql;
}

function GetInsertSuffixSQL(db, schema, table){
var sql =
`\nfrom "${db}"."${schema}"."${table}" ;`;
return sql;
}

//function GetInsertSuffixSQL(db, schema, table){
//var sql = '\nfrom "${db}"."${schema}"."${table}";';
//return sql;
//}


/****************************************************************************************************
*                                                                                                   *
*  SQL functions                                                                                    *
*                                                                                                   *
****************************************************************************************************/

function GetResultSet(sql){
    cmd1 = {sqlText: sql};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    rs = stmt.execute();
    return rs;
}

function ExecuteNonQuery(queryString) {
    var out = '';
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    rs = stmt.execute();
}

function ExecuteSingleValueQuery(columnName, queryString) {
    var out;
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    try{
        rs = stmt.execute();
        rs.next();
        return rs.getColumnValue(columnName);
    }
    catch(err) {
        if (err.message.substring(0, 18) == "ResultSet is empty"){
            throw "ERROR: No rows returned in query.";
        } else {
            throw "ERROR: " + err.message.replace(/\n/g, " ");
        } 
    }
    return out;
}

function ExecuteFirstValueQuery(queryString) {
    var out;
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    try{
        rs = stmt.execute();
        rs.next();
        return rs.getColumnValue(1);
    }
    catch(err) {
        if (err.message.substring(0, 18) == "ResultSet is empty"){
            throw "ERROR: No rows returned in query.";
        } else {
            throw "ERROR: " + err.message.replace(/\n/g, " ");
        } 
    }
    return out;
}

function getQuery(sql){
    var cmd = {sqlText: sql};
    var query = new Query(snowflake.createStatement(cmd));
    try {
        query.resultSet = query.statement.execute();
    } catch (err) {
        throw "ERROR: " + err.message.replace(/\n/g, " ");
    }
    return query;
}

$$;

推荐阅读