amazon-s3 - 使用 COPY 命令将最新文件从 S3 阶段复制到雪花表
问题描述
我有一个按日期划分的 S3 阶段,我希望每天每小时都有一个文件。我只想使用 COPY 命令从 S3 阶段选择最新文件。
如何指定复制命令以仅选择最新文件?我读到雪花将加载元数据的历史记录保留 64 天,以避免加载相同的文件。但我想知道是否有任何方法可以通过 COPY 命令仅选择最新文件。
FILE_FORMAT=(type=csv
compression=gzip
field_delimiter=','
skip_header=1
field_optionally_enclosed_by='\"'
empty_field_as_null=true
NULL_IF = ('NULL','null','')
date_format='yyyy-mm-dd' time_format='hh24:mi:ss.ff'
)
解决方案
我认为您将需要一个存储过程来执行此操作。我从我编写的项目中提取了一些代码以批量加载文件https://github.com/GregPavlik/snowflake_bulk_loader/blob/master/02.%20Bulk%20Load%20-%20Runtime.sql
第一个挑战是文件的 LIST 命令返回的日期格式与标准时间戳文字格式不同。有一个 UDF 可以从上次修改时间转换为雪花时间戳。第二个挑战是 LIST 命令返回路径中的阶段名称,但方式略有不同,具体取决于它是内部还是外部以及外部取决于云主机。还有另一个 UDF 从路径中去除舞台名称。
从那里,SP 列出文件,获取最新的文件,并发出复制命令。您可以按照它正在做的事情并将适当的点更改为您的阶段名称、表名称和复制命令。
这假设您使用的是命名阶段(外部阶段被命名为使用 @ 引用的内部阶段)。如果没有,您可以调整以 @ 为前缀的阶段名称的代码。
create or replace function LAST_MODIFIED_TO_TIMESTAMP(LAST_MODIFIED string)
returns timestamp_tz
as
$$
to_timestamp_tz(left(LAST_MODIFIED, len(LAST_MODIFIED) - 4) || ' ' || '00:00', 'DY, DD MON YYYY HH:MI:SS TZH:TZM')
$$;
create or replace function STAGE_PATH_SHORTEN(FILE_PATH string)
returns string
language javascript
as
$$
/*
Removes the cloud provider prefix and stage name from the file path
*/
var s3 = FILE_PATH.search(/s3:\/\//i);
if ( s3 != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", s3 + 5) + 1);
}
var azure = FILE_PATH.search(/azure:\/\//i);
if ( azure != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", azure + 8) + 1);
}
var newStyleInternal = FILE_PATH.search(/stages\//i);
if (newStyleInternal != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal + 7) + 1);
}
var newStyleInternal = FILE_PATH.search(/stages[a-zA-Z0-9]{4,10}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}\//i);
if (newStyleInternal != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal) + 1);
}
var stageRegExp = "/";
var re = new RegExp(stageRegExp, "i");
var stageInStr = FILE_PATH.search(re);
if (stageInStr != -1){
return FILE_PATH.substring(FILE_PATH.indexOf("/", stageInStr) + 1);
}
throw "Unknown file path type."
$$;
create or replace procedure INGEST_MOST_RECENT_FILE(STAGE_NAME string, TARGET_TABLE string)
returns string
language javascript
execute as caller
as
$$
try{
getResultSet(`list @${STAGE_NAME}`);
var fileName = executeSingleValueQuery('FILE_NAME',
`select stage_path_shorten("name") as FILE_NAME, LAST_MODIFIED_TO_TIMESTAMP("last_modified") as LAST_MODIFIED_TS
from table(result_scan(last_query_id())) order by LAST_MODIFIED_TS desc limit 1;`);
// Modify with your COPY INTO statement here, but leave the last part -- files=('${fileName}') -- unmodified.
var copyStatus = executeSingleValueQuery("status",
`
copy into ${TARGET_TABLE} from @${STAGE_NAME} file_format=(type=CSV) files=('${fileName}') ;
`
);
return `Copy status: ${copyStatus}.`;
} catch (err) {
return `Error: ${err.message}.`;
}
function getResultSet(sql){
cmd = {sqlText: sql};
stmt = snowflake.createStatement(cmd);
var rs;
rs = stmt.execute();
return rs;
}
function executeSingleValueQuery(columnName, queryString) {
var out;
cmd1 = {sqlText: queryString};
stmt = snowflake.createStatement(cmd1);
var rs;
try{
rs = stmt.execute();
rs.next();
return rs.getColumnValue(columnName);
}
catch(err) {
if (err.message.substring(0, 18) == "ResultSet is empty"){
throw "ERROR: No rows returned in query.";
} else {
throw "ERROR: " + err.message.replace(/\n/g, " ");
}
}
return out;
}
$$;
call ingest_most_recent_file('TEST_STAGE', 'TARGET_TABLE');
推荐阅读
- javascript - 将平面对象数组转换为嵌套数组
- javascript - 为什么我不能删除 Object.defineProperties 中定义的 getter/setter?
- vim - 如何更改 vim 默认描述?
- python - 在 tensorflow v2 的图中使用 tf.timestamp()
- javascript - 控制器返回值但未显示在 ASP.NET Core 的 JqGrid 中
- reactjs - react-dnd 失败时延迟返回原位
- spring-boot - 没有@SpringBootApplication时如何使用@JsonTest?
- flutter - 当系统区域设置更改时,iOS 是否总是重新启动应用程序?
- python-3.x - 如何通过一键升序和一键降序按值对嵌套列表的字典进行排序?
- masstransit - 如何使用 Bus.Factory.CreateUsingRabbitMq 配置 Mass Transit Courier?