amazon-s3 - 如何在加载 S3 时自动推断 CSV 文件的架构?
问题描述
语境
目前,我使用 Snowflake 作为数据仓库,使用 AWS 的 S3 作为数据湖。登陆 S3 的大多数文件都是Parquet格式。对于这些,我使用了 Snowflake 的一个新的受限功能(在此处记录),它可以自动检测 S3 上 parquet 文件中的模式,我可以使用它来生成CREATE TABLE
具有正确列名和推断数据类型的语句。此功能目前仅适用于 Apache Parquet、Avro 和 ORC 文件。我想找到一种方法来实现相同的预期目标,但适用于 CSV 文件。
我试图做的
这就是我目前推断 Parquet 文件架构的方式:
select generate_column_description(array_agg(object_construct(*)), 'table') as columns
from table (infer_schema(location=>'${LOCATION}', file_format=>'${FILE_FORMAT}'))
但是,如果我尝试将其指定FILE_FORMAT
为 csv,则该方法将失败。
我考虑过的其他方法:
- 将 S3 上的所有文件传输到 parquet(这涉及更多代码和基础设施设置,因此不是我的首选,尤其是我想在 s3 上保留一些文件的自然类型)
- 有一个脚本(例如使用 Python 中的 Pandas 之类的库)来推断 S3 中文件的架构(这也涉及更多代码,并且从在 Snowflake 中处理 parquet 文件的意义上来说会很奇怪,但非 parquet 文件由aws上的一些脚本)。
- 使用 Snowflake UDF 推断架构。还没有完全考虑我的选择。
期望的行为
当一个新的 csv 文件登陆 S3(在预先存在的 STAGE 上)时,我想推断模式,并能够生成CREATE TABLE
具有推断数据类型的语句。最好,我想在 Snowflake 中这样做,因为现有的上述模式推理解决方案在那里存在。如果需要,很高兴添加更多信息。
解决方案
更新:我修改了在无类型(所有字符串类型列)表中推断数据类型的 SP,它现在直接针对 Snowflake 阶段。项目代码可在此处获得:https ://github.com/GregPavlik/InferSchema
我写了一个存储过程来帮助解决这个问题;但是,它的唯一目标是推断无类型列的数据类型。它的工作原理如下:
- 将 CSV 加载到表中,所有列都定义为 varchars。
- 使用针对新表的查询调用 SP(要点是仅获取所需的列并限制行数以保持类型推断时间合理)。
- 在 SP 调用中还有旧位置和新位置的数据库、模式和表——旧的带有所有 varchar,新的带有推断的类型。
然后 SP 将推断数据类型并创建两个 SQL 语句。一条语句将使用推断的数据类型创建新表。一条语句将使用适当的包装器(例如 try_multi_timestamp(),一个扩展 try_to_timestamp() 以尝试各种常见格式的 UDF)从无类型(全 varchar)表复制到新表。
我的意思是扩展它,以便它根本不需要无类型(所有 varchar)表,但还没有解决它。既然它出现在这里,我可能会回过头来用这种能力更新 SP。您可以指定直接从阶段读取的查询,但您必须使用 $1、$2... 和列名的别名(否则 DDL 将尝试创建像 $1 这样的列名)。如果查询直接针对某个阶段运行,对于旧的数据库、模式和表,您可以输入任何内容,因为这仅用于从 select 语句生成插入。
-- This shows how to use on the Snowflake TPCH sample, but could be any query.
-- Keep the row count down to reduce the time it take to infer the types.
call infer_data_types('select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM limit 10000',
'SNOWFLAKE_SAMPLE_DATA', 'TPCH_SF1', 'LINEITEM',
'TEST', 'PUBLIC', 'LINEITEM');
create or replace procedure INFER_DATA_TYPES(SOURCE_QUERY string,
DATABASE_OLD string,
SCHEMA_OLD string,
TABLE_OLD string,
DATABASE_NEW string,
SCHEMA_NEW string,
TABLE_NEW string)
returns string
language javascript
as
$$
/****************************************************************************************************
* *
* DataType Classes
* *
****************************************************************************************************/
class Query{
constructor(statement){
this.statement = statement;
}
}
class DataType {
constructor(db, schema, table, column, sourceQuery) {
this.db = db;
this.schema = schema;
this.table = table;
this.sourceQuery = sourceQuery
this.column = column;
this.insert = '"@~COLUMN~@"';
this.totalCount = 0;
this.notNullCount = 0;
this.typeCount = 0;
this.blankCount = 0;
this.minTypeOf = 0.95;
this.minNotNull = 1.00;
}
setSQL(sqlTemplate){
this.sql = sqlTemplate;
this.sql = this.sql.replace(/@~DB~@/g, this.db);
this.sql = this.sql.replace(/@~SCHEMA~@/g, this.schema);
this.sql = this.sql.replace(/@~TABLE~@/g, this.table);
this.sql = this.sql.replace(/@~COLUMN~@/g, this.column);
}
getCounts(){
var rs;
rs = GetResultSet(this.sql);
rs.next();
this.totalCount = rs.getColumnValue("TOTAL_COUNT");
this.notNullCount = rs.getColumnValue("NON_NULL_COUNT");
this.typeCount = rs.getColumnValue("TO_TYPE_COUNT");
this.blankCount = rs.getColumnValue("BLANK");
}
isCorrectType(){
return (this.typeCount / (this.notNullCount - this.blankCount) >= this.minTypeOf);
}
isNotNull(){
return (this.notNullCount / this.totalCount >= this.minNotNull);
}
}
class TimestampType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "timestamp";
this.insert = 'try_multi_timestamp(trim("@~COLUMN~@"))';
this.sourceQuery = SOURCE_QUERY;
this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
this.getCounts();
}
}
class IntegerType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "number(38,0)";
this.insert = 'try_to_number(trim("@~COLUMN~@"), 38, 0)';
this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
this.getCounts();
}
}
class DoubleType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "double";
this.insert = 'try_to_double(trim("@~COLUMN~@"))';
this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
this.getCounts();
}
}
class BooleanType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "boolean";
this.insert = 'try_to_boolean(trim("@~COLUMN~@"))';
this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
this.getCounts();
}
}
// Catch all is STRING data type
class StringType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "string";
this.totalCount = 1;
this.notNullCount = 0;
this.typeCount = 1;
this.minTypeOf = 0;
this.minNotNull = 1;
}
}
/****************************************************************************************************
* *
* Main function *
* *
****************************************************************************************************/
var pass = 0;
var column;
var typeOf;
var ins = '';
var newTableDDL = '';
var insertDML = '';
var columnRS = GetResultSet(GetTableColumnsSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD));
while (columnRS.next()){
pass++;
if(pass > 1){
newTableDDL += ",\n";
insertDML += ",\n";
}
column = columnRS.getColumnValue("COLUMN_NAME");
typeOf = InferDataType(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD, column, SOURCE_QUERY);
newTableDDL += '"' + typeOf.column + '" ' + typeOf.syntax;
ins = typeOf.insert;
insertDML += ins.replace(/@~COLUMN~@/g, typeOf.column);
}
return GetOpeningComments() +
GetDDLPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW) +
newTableDDL +
GetDDLSuffixSQL() +
GetDividerSQL() +
GetInsertPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW) +
insertDML +
GetInsertSuffixSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD) ;
/****************************************************************************************************
* *
* Helper functions *
* *
****************************************************************************************************/
function InferDataType(db, schema, table, column, sourceQuery){
var typeOf;
typeOf = new IntegerType(db, schema, table, column, sourceQuery);
if (typeOf.isCorrectType()) return typeOf;
typeOf = new DoubleType(db, schema, table, column, sourceQuery);
if (typeOf.isCorrectType()) return typeOf;
typeOf = new BooleanType(db, schema, table, column, sourceQuery); // May want to do a distinct and look for two values
if (typeOf.isCorrectType()) return typeOf;
typeOf = new TimestampType(db, schema, table, column, sourceQuery);
if (typeOf.isCorrectType()) return typeOf;
typeOf = new StringType(db, schema, table, column, sourceQuery);
if (typeOf.isCorrectType()) return typeOf;
return null;
}
/****************************************************************************************************
* *
* SQL Template Functions *
* *
****************************************************************************************************/
function GetCheckTypeSQL(insert, sourceQuery){
var sql =
`
select count(1) as TOTAL_COUNT,
count("@~COLUMN~@") as NON_NULL_COUNT,
count(${insert}) as TO_TYPE_COUNT,
sum(iff(trim("@~COLUMN~@")='', 1, 0)) as BLANK
--from "@~DB~@"."@~SCHEMA~@"."@~TABLE~@";
from (${sourceQuery})
`;
return sql;
}
function GetTableColumnsSQL(dbName, schemaName, tableName){
var sql =
`
select COLUMN_NAME
from ${dbName}.INFORMATION_SCHEMA.COLUMNS
where TABLE_CATALOG = '${dbName}' and
TABLE_SCHEMA = '${schemaName}' and
TABLE_NAME = '${tableName}'
order by ORDINAL_POSITION;
`;
return sql;
}
function GetOpeningComments(){
return `
/**************************************************************************************************************
* *
* Copy and paste into a worksheet to create the typed table and insert into the new table from the old one. *
* *
**************************************************************************************************************/
`;
}
function GetDDLPrefixSQL(db, schema, table){
var sql =
`
create or replace table "${db}"."${schema}"."${table}"
(
`;
return sql;
}
function GetDDLSuffixSQL(){
return "\n);";
}
function GetDividerSQL(){
return `
/**************************************************************************************************************
* *
* The SQL statement below this attempts to copy all rows from the string tabe to the typed table. *
* *
**************************************************************************************************************/
`;
}
function GetInsertPrefixSQL(db, schema, table){
var sql =
`\ninsert into "${db}"."${schema}"."${table}" select\n`;
return sql;
}
function GetInsertSuffixSQL(db, schema, table){
var sql =
`\nfrom "${db}"."${schema}"."${table}" ;`;
return sql;
}
//function GetInsertSuffixSQL(db, schema, table){
//var sql = '\nfrom "${db}"."${schema}"."${table}";';
//return sql;
//}
/****************************************************************************************************
* *
* SQL functions *
* *
****************************************************************************************************/
function GetResultSet(sql){
cmd1 = {sqlText: sql};
stmt = snowflake.createStatement(cmd1);
var rs;
rs = stmt.execute();
return rs;
}
function ExecuteNonQuery(queryString) {
var out = '';
cmd1 = {sqlText: queryString};
stmt = snowflake.createStatement(cmd1);
var rs;
rs = stmt.execute();
}
function ExecuteSingleValueQuery(columnName, queryString) {
var out;
cmd1 = {sqlText: queryString};
stmt = snowflake.createStatement(cmd1);
var rs;
try{
rs = stmt.execute();
rs.next();
return rs.getColumnValue(columnName);
}
catch(err) {
if (err.message.substring(0, 18) == "ResultSet is empty"){
throw "ERROR: No rows returned in query.";
} else {
throw "ERROR: " + err.message.replace(/\n/g, " ");
}
}
return out;
}
function ExecuteFirstValueQuery(queryString) {
var out;
cmd1 = {sqlText: queryString};
stmt = snowflake.createStatement(cmd1);
var rs;
try{
rs = stmt.execute();
rs.next();
return rs.getColumnValue(1);
}
catch(err) {
if (err.message.substring(0, 18) == "ResultSet is empty"){
throw "ERROR: No rows returned in query.";
} else {
throw "ERROR: " + err.message.replace(/\n/g, " ");
}
}
return out;
}
function getQuery(sql){
var cmd = {sqlText: sql};
var query = new Query(snowflake.createStatement(cmd));
try {
query.resultSet = query.statement.execute();
} catch (err) {
throw "ERROR: " + err.message.replace(/\n/g, " ");
}
return query;
}
$$;
推荐阅读
- python - 按特定 ID 过滤所有帖子
- angular - rxjs 订阅函数 - 等待事件
- unit-testing - TDD:在实现另一种(公共)方法之前无法测试一种方法的行为,反之亦然
- discord - Discord.js 反应问题
- sql - Postgres中的递归父子关系
- mysql - mysql替换路径
- javascript - 如何向 Laravel 发送数据和文件?
- c# - 在意外的时间调用了一个方法。一次只能注册一个 PrintTaskRequested 事件的处理程序
- php - 如何从第三方应用程序中创建的 webhook 接收数据?
- r - 指定列时,像 fread 和 vroom 这样的 csv 阅读器是否会读取所有列?