首页 > 解决方案 > 如何使用 Kiba-ETL 转换嵌套的 JSON 有效负载?

问题描述

我想使用 Kiba-ETL 将嵌套的 JSON 有效负载转换为关系表。这是一个简化的伪 JSON 有效负载:

{
  "bookings": [
    {
      "bookingNumber": "1111",
      "name": "Booking 1111",
      "services": [
        {
          "serviceNumber": "45",
          "serviceName": "Extra Service"
        }
      ]
    },
    {
      "bookingNumber": "2222",
      "name": "Booking 2222",
      "services": [
        {
          "serviceNumber": "1",
          "serviceName": "Super Service"
        },
        {
          "serviceNumber": "2",
          "serviceName": "Bonus Service"
        }
      ]
    }
  ]
}

如何将此有效负载转换为两个表:

Kiba::Common::Transforms::EnumerableExploder我在 wiki、blog 等的帮助下阅读了关于产生多行的信息 。

您会通过产生多行(预订和多项服务)来解决我的用例,还是会实施一个Destination接收整个预订并调用一些子目的地(即创建或更新服务)?

标签: rubykiba-etl

解决方案


Kiba的作者在这里!

这是一个常见的要求,但它可以(这不是 Kiba 特有的)处理起来或多或少复杂。以下是您需要考虑的几点。

外键处理

这里的主要问题是,一旦插入服务和预订,您将希望保持它们之间的关系。

使用业务键的外键

处理此问题的第一种(最简单的)方法是对“预订号”使用外键约束,并确保在每个服务行中插入该预订号,以便稍后在查询中使用它。如果您这样做(请参阅https://stackoverflow.com/a/18435114/20302),您必须在预订表目标中设置“预订号”的唯一约束。

使用主键的外键

如果您更喜欢使用booking_idwhich 指向bookingsid键,那么事情会更复杂一些。

如果这是针对空表的一次性导入,我建议您使用以下方式任意强制主键:

transform do |r|
  @row_index ||= 0
  @row_index += 1
  r.merge(id: @row_index)
end

如果这不是一次性导入,您将必须: * 在第一遍中更新预订 * 在第二遍中,查找(通过 SQL 查询)“预订”以找出id要存储的内容booking_id,然后更新插入服务

如您所见,这需要更多工作,因此如果您对此没有强烈要求,请坚持使用选项 1(尽管从长远来看,选项 2 更可靠)。

示例实现(使用 Kiba Pro 和业务密钥)

实现这一点的最简单方法(假设您的目标是 Postgres)是使用 Kiba Pro 的SQL Bulk Insert/Upsert destination

它会这样(单程):

extend Kiba::DSLExtensions::Config
config :kiba, runner: Kiba::StreamingRunner

source Kiba::Common::Sources::Enumerable, -> { Dir["input/*.json"] }

transform { |r| JSON.parse(IO.read(r)).fetch('bookings') }

transform Kiba::Common::Transforms::EnumerableExploder

# SNIP (remapping / renaming of fields etc)

first_destination = nil

destination Kiba::Pro::Destinations::SQLBulkInsert,
  row_pre_processor: -> (row) { row.except("services") },
  dataset: -> (dataset) {
    dataset.insert_conflict(target: :booking_number)
  },
  after_read: -> (d) { first_destination = d }

destination Kiba::Pro::Destinations::SQLBulkInsert,
  row_pre_processor: -> (row) { row.fetch("services") },
  dataset: -> (dataset) {
    dataset.insert_conflict(target: :service_number)
  },
  before_flush: -> { first_destination.flush }

在这里,我们遍历每个输入文件,对其进行解析并获取“bookings”,然后为“bookings”的每个元素生成一行。

我们有 2 个目的地,做“upsert”(插入或更新),加上一个技巧来确保我们在插入子行之前保存父行,以避免由于缺少指向记录而失败。

您当然可以自己实现它,但这有点工作!

如果您需要使用基于主键的外键,您将(可能)分成 2 遍(每个目的地一个),然后在中间添加某种形式的查找。

结论

我知道这不是微不足道的(取决于您需要什么,以及您是否使用 Kiba Pro),但至少我正在分享我在这种情况下使用的模式。

希望它有点帮助!


推荐阅读