首页 > 解决方案 > KSQL 流 - 遍历 JSON

问题描述

如何在 KSQL 流中迭代 JSON:

我的 JSON 看起来像:

{
   "Obj" {
       "ID" : "1"
    },
    "KeyValues": {
        "Key1": "value1",
        "Key2": "value2",
        "Key3": "value3",
        "Key4": "value4",
        "Key5": "value5",
        "Key6": "value6",
        "Key7": "value7",
        "Key8": "value8",
        "Key9": "value9",
        "Key10": "value10",
            |
            |
            |
            |

        "KeyN": "valueN"
  }
}

我怎样才能在 KSQL 中低于 o/p 。需要为 N 个元素迭代 JSON 对象并列出如下。

   ID    KEY              VALUE
----------------------------------    
   1     Key1            value1
   1     Key2            value2
   1     Key3            value3
   1     Key4            value4
   1     Key5            value5
   1     Key6            value6
   1     Key7            value7
   1     Key8            value8
   1     Key9            value9
   1     Key10           value10
   1            |
   1            |
   1            |
   1            |
   1 
   1      KeyN          valueN

提前致谢。

标签: apache-kafkaksqldbconfluent-platform

解决方案


似乎UDTF是适合您的解决方案。您可以将explode视为一个 UDTF 示例,它接收一个数组,然后输出 N 行,每个元素一个。

您的 UDTF 的签名将是类似的:

@Udtf(schema = "STRUCT<key VARCHAR, value VARCHAR>")
public <T> List<Struct> expandMapEntries(final Map<String, String> input) {
  // output a list of key value pairs as a struct from 'input'
}

然后您可以使用此 UDTF 并从中选择字段(如下所示):

CREATE STREAM expanded AS SELECT EXPAND_MAP_ENTRIES(KeyValues) AS keyVals FROM source;
CREATE STREAM flattened AS keyVals->key as `KEY`, keyVals->value AS VALUE FROM expanded;

让我知道这是否对您有用,并随时联系社区 slack (@almog) - 我对这个用例非常感兴趣。


推荐阅读