首页 > 解决方案 > 使用 COPY 命令将最新文件从 S3 阶段复制到雪花表

问题描述

我有一个按日期划分的 S3 阶段,我希望每天每小时都有一个文件。我只想使用 COPY 命令从 S3 阶段选择最新文件。

如何指定复制命令以仅选择最新文件?我读到雪花将加载元数据的历史记录保留 64 天,以避免加载相同的文件。但我想知道是否有任何方法可以通过 COPY 命令仅选择最新文件。

FILE_FORMAT=(type=csv 
            compression=gzip 
            field_delimiter=','  
            skip_header=1 
            field_optionally_enclosed_by='\"' 
            empty_field_as_null=true 
            NULL_IF = ('NULL','null','') 
            date_format='yyyy-mm-dd' time_format='hh24:mi:ss.ff' 
            )

标签: amazon-s3snowflake-cloud-data-platform

解决方案


我认为您将需要一个存储过程来执行此操作。我从我编写的项目中提取了一些代码以批量加载文件https://github.com/GregPavlik/snowflake_bulk_loader/blob/master/02.%20Bulk%20Load%20-%20Runtime.sql

第一个挑战是文件的 LIST 命令返回的日期格式与标准时间戳文字格式不同。有一个 UDF 可以从上次修改时间转换为雪花时间戳。第二个挑战是 LIST 命令返回路径中的阶段名称,但方式略有不同,具体取决于它是内部还是外部以及外部取决于云主机。还有另一个 UDF 从路径中去除舞台名称。

从那里,SP 列出文件,获取最新的文件,并发出复制命令。您可以按照它正在做的事情并将适当的点更改为您的阶段名称、表名称和复制命令。

这假设您使用的是命名阶段(外部阶段被命名为使用 @ 引用的内部阶段)。如果没有,您可以调整以 @ 为前缀的阶段名称的代码。

create or replace function LAST_MODIFIED_TO_TIMESTAMP(LAST_MODIFIED string) 
returns timestamp_tz
as
$$
    to_timestamp_tz(left(LAST_MODIFIED, len(LAST_MODIFIED) - 4) || ' ' || '00:00', 'DY, DD MON YYYY HH:MI:SS TZH:TZM')
$$;


create or replace function STAGE_PATH_SHORTEN(FILE_PATH string)
returns string
language javascript
as
$$
    /*
        Removes the cloud provider prefix and stage name from the file path
    */
    var s3 = FILE_PATH.search(/s3:\/\//i);

    if ( s3 != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", s3 + 5) + 1);
    }

    var azure = FILE_PATH.search(/azure:\/\//i);

    if ( azure != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", azure + 8) + 1);
    }

    var newStyleInternal = FILE_PATH.search(/stages\//i);

    if (newStyleInternal != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal + 7) + 1);
    }

    var newStyleInternal = FILE_PATH.search(/stages[a-zA-Z0-9]{4,10}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}\//i);

    if (newStyleInternal != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal) + 1);
    }

    var stageRegExp = "/";
    var re = new RegExp(stageRegExp, "i");

    var stageInStr = FILE_PATH.search(re);

    if (stageInStr != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", stageInStr) + 1);
    }

    throw "Unknown file path type."
$$;


create or replace procedure INGEST_MOST_RECENT_FILE(STAGE_NAME string, TARGET_TABLE string)
returns string
language javascript
execute as caller
as
$$

try{

    getResultSet(`list @${STAGE_NAME}`);
    var fileName = executeSingleValueQuery('FILE_NAME', 
                                           `select stage_path_shorten("name") as FILE_NAME, LAST_MODIFIED_TO_TIMESTAMP("last_modified") as LAST_MODIFIED_TS 
                                            from table(result_scan(last_query_id())) order by LAST_MODIFIED_TS desc limit 1;`);
    
    // Modify with your COPY INTO statement here, but leave the last part -- files=('${fileName}') -- unmodified.
    var copyStatus = executeSingleValueQuery("status",

`
copy into ${TARGET_TABLE} from @${STAGE_NAME} file_format=(type=CSV) files=('${fileName}') ;
`

    );
    
    return `Copy status: ${copyStatus}.`;

} catch (err) {
    return `Error: ${err.message}.`;
}

function getResultSet(sql){
    cmd = {sqlText: sql};
    stmt = snowflake.createStatement(cmd);
    var rs;
    rs = stmt.execute();
    return rs;
}

function executeSingleValueQuery(columnName, queryString) {
    var out;
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    try{
        rs = stmt.execute();
        rs.next();
        return rs.getColumnValue(columnName);
    }
    catch(err) {
        if (err.message.substring(0, 18) == "ResultSet is empty"){
            throw "ERROR: No rows returned in query.";
        } else {
            throw "ERROR: " + err.message.replace(/\n/g, " ");
        } 
    }
    return out;
}

$$;

call ingest_most_recent_file('TEST_STAGE', 'TARGET_TABLE');

推荐阅读