首页 > 解决方案 > 将数据从无模式数据库迁移到关系数据库:MongoDB 到 Snowflake

问题描述

我们有一个用例引导我写这篇文章,我相信你们中的许多人都会遇到这种情况。情况是通过单个 Talend 作业将多个集合从 MongoDB 迁移到 Snowflake 数据库,并将集合的顶级节点保留为 Snowflake 表中的单个字段。

现在我们知道 Talend 不支持 MongoDB 源的动态模式,因为 MongoDB 集合不强制使用模式,这意味着我们必须为我们想要摄取的每个现有/新集合创建单独的作业/子作业必须重新设计工作以应对文档中的未来更改,同时确保其始终有效,因此我们必须寻找替代解决方案。

这是方法,

第一步:从 MongoDB 集合中获取所有顶级键及其类型。我们使用了$objectToArarray的聚合来将所有顶部键和值对转换为文档数组,然后使用$unwind$group以及$addToSet来获取整个集合中不同的键和值类型。

 {
"_id" : "1",
"keys" : [ 
    "field1~string", 
    "field2~object", 
    "filed3~date",
    "_id~objectId"
 ]

}

第二步:在 Mongo Datatype 和 Snowflake Datatype 之间创建一对一的映射。我们创建了一个名为“ dataTypes ”的哈希映射来存储这些信息。或者,此信息可以存储在表格或文件等中。

 java.util.Map<String,String> dataTypes = new java.util.HashMap<String,String>();
 dataTypes.put("string","VARCHAR");
 dataTypes.put("int","NUMBER");
 dataTypes.put("objectId","VARCHAR");
 dataTypes.put("object","VARIANT");
 dataTypes.put("date","TIMESTAMP_LTZ");
 dataTypes.put("array","VARCHAR");
 dataTypes.put("bool","BOOLEAN");

第三步:将键与雪花进行比较:首先我们查询雪花INFORMATION_SCHEMA表是否存在,如果不存在则创建表,如果存在则检查文档中字段的更改并添加或修改雪花表中的那些列。通过使用第二步中的“数据类型映射”并迭代第一步中的键来生成 DDL 脚本

第四步:使用mongoexport命令将数据从 MongoDB 卸载到本地文件系统:

mongoexport --db <databaseName> --collection <collectionName> --type=csv --fields=<fieldList> --out <filename>

这是从第一步中的键准备的。

第五步:使用Snowsql的PUT命令将 .csv 文件从本地文件系统暂存到雪花暂存位置。

snowsql -d <database> -s <schema> -o exit_on_error=true -o log_level=DEBUG  -q  'put <fileName> @<internalStage> OVERWRITE=TRUE';

第六步:将数据从暂存位置加载到雪花表

COPY INTO <tableName> FROM @<internalStage> 
[file_format=<fileFormat>] [pattern=<regex_pattern>]

在这里指定 file_format 和模式是可选的,我们使用了正则表达式,因为我们在一个雪花阶段为每个集合暂存多个文件。

第七步:维护一个集合列表,该列表可以放在本地文件系统的文件中或数据库表中,在 Talend 作业中迭代集合列表,并通过参数化集合名称、表名称,通过上述步骤处理每个集合,文件名和暂存名称等在作业中。

标签: mongodbsnowflake-cloud-data-platformtalenddynamic-schema

解决方案


一种解决方案是将 Mongodb 集合的记录加载到variant类型的 Snowflake 字段中。然后,创建一个 Snowflake 视图以使用 Snowflake 的点表示法提取特定键。

将您的数据导出为 JSON 类型。

mongoexport --type=json --out <filename>

将该导出加载到具有如下结构的表中。

create table collection_name_exports (
  data variant,  -- This column will contain your export
  inserted_at datetime default current_timestamp()
);

根据需要将键提取到视图的列中。

create view collection_name_view as
select
  collection_name_exports:key1 as field1,
  collection_name_exports:key2 as field2
from collection_name_exports

推荐阅读