ruby - 如何使用 Kiba-ETL 转换嵌套的 JSON 有效负载?
问题描述
我想使用 Kiba-ETL 将嵌套的 JSON 有效负载转换为关系表。这是一个简化的伪 JSON 有效负载:
{
"bookings": [
{
"bookingNumber": "1111",
"name": "Booking 1111",
"services": [
{
"serviceNumber": "45",
"serviceName": "Extra Service"
}
]
},
{
"bookingNumber": "2222",
"name": "Booking 2222",
"services": [
{
"serviceNumber": "1",
"serviceName": "Super Service"
},
{
"serviceNumber": "2",
"serviceName": "Bonus Service"
}
]
}
]
}
如何将此有效负载转换为两个表:
- 预订
- services(每项服务都属于一个预订)
Kiba::Common::Transforms::EnumerableExploder
我在 wiki、blog 等的帮助下阅读了关于产生多行的信息 。
您会通过产生多行(预订和多项服务)来解决我的用例,还是会实施一个Destination
接收整个预订并调用一些子目的地(即创建或更新服务)?
解决方案
Kiba的作者在这里!
这是一个常见的要求,但它可以(这不是 Kiba 特有的)处理起来或多或少复杂。以下是您需要考虑的几点。
外键处理
这里的主要问题是,一旦插入服务和预订,您将希望保持它们之间的关系。
使用业务键的外键
处理此问题的第一种(最简单的)方法是对“预订号”使用外键约束,并确保在每个服务行中插入该预订号,以便稍后在查询中使用它。如果您这样做(请参阅https://stackoverflow.com/a/18435114/20302),您必须在预订表目标中设置“预订号”的唯一约束。
使用主键的外键
如果您更喜欢使用booking_id
which 指向bookings
表id
键,那么事情会更复杂一些。
如果这是针对空表的一次性导入,我建议您使用以下方式任意强制主键:
transform do |r|
@row_index ||= 0
@row_index += 1
r.merge(id: @row_index)
end
如果这不是一次性导入,您将必须: * 在第一遍中更新预订 * 在第二遍中,查找(通过 SQL 查询)“预订”以找出id
要存储的内容booking_id
,然后更新插入服务
如您所见,这需要更多工作,因此如果您对此没有强烈要求,请坚持使用选项 1(尽管从长远来看,选项 2 更可靠)。
示例实现(使用 Kiba Pro 和业务密钥)
实现这一点的最简单方法(假设您的目标是 Postgres)是使用 Kiba Pro 的SQL Bulk Insert/Upsert destination。
它会这样(单程):
extend Kiba::DSLExtensions::Config
config :kiba, runner: Kiba::StreamingRunner
source Kiba::Common::Sources::Enumerable, -> { Dir["input/*.json"] }
transform { |r| JSON.parse(IO.read(r)).fetch('bookings') }
transform Kiba::Common::Transforms::EnumerableExploder
# SNIP (remapping / renaming of fields etc)
first_destination = nil
destination Kiba::Pro::Destinations::SQLBulkInsert,
row_pre_processor: -> (row) { row.except("services") },
dataset: -> (dataset) {
dataset.insert_conflict(target: :booking_number)
},
after_read: -> (d) { first_destination = d }
destination Kiba::Pro::Destinations::SQLBulkInsert,
row_pre_processor: -> (row) { row.fetch("services") },
dataset: -> (dataset) {
dataset.insert_conflict(target: :service_number)
},
before_flush: -> { first_destination.flush }
在这里,我们遍历每个输入文件,对其进行解析并获取“bookings”,然后为“bookings”的每个元素生成一行。
我们有 2 个目的地,做“upsert”(插入或更新),加上一个技巧来确保我们在插入子行之前保存父行,以避免由于缺少指向记录而失败。
您当然可以自己实现它,但这有点工作!
如果您需要使用基于主键的外键,您将(可能)分成 2 遍(每个目的地一个),然后在中间添加某种形式的查找。
结论
我知道这不是微不足道的(取决于您需要什么,以及您是否使用 Kiba Pro),但至少我正在分享我在这种情况下使用的模式。
希望它有点帮助!
推荐阅读
- reactjs - React TypeScript 16.8 - 如何在没有 Helmet 的情况下更改页面标题
- shell - 用于生成文件中的可选字符串的 Shell 通配符/glob
- android - MVVM 我应该从哪里订阅 Rx?
- mysql - 我正在尝试打开一个游标。尽管与 mysql 数据库有连接,但它无法正常工作
- c# - 如何使用 c# 执行时间比较来自 2 个数组的单个属性应该是最少的
- java - 如何在corda中创建服务以将任意数据存储在节点数据库中的任意位置而不是保险库内
- reactjs - 如何不使用“react-intl-translations-manager”删除现有翻译?
- excel - 如何将文本从 Excel 发送到一个单词(单元格)为粗体的 Word?
- apache-kafka - Kafka 连接与多个消息队列的集成
- c# - 我想根据间隔(15,30,,45,60)组合我的时间段,并在 sql 存储过程中添加列