apache-kafka - KSQL 流 - 遍历 JSON
问题描述
如何在 KSQL 流中迭代 JSON:
我的 JSON 看起来像:
{
"Obj" {
"ID" : "1"
},
"KeyValues": {
"Key1": "value1",
"Key2": "value2",
"Key3": "value3",
"Key4": "value4",
"Key5": "value5",
"Key6": "value6",
"Key7": "value7",
"Key8": "value8",
"Key9": "value9",
"Key10": "value10",
|
|
|
|
"KeyN": "valueN"
}
}
我怎样才能在 KSQL 中低于 o/p 。需要为 N 个元素迭代 JSON 对象并列出如下。
ID KEY VALUE
----------------------------------
1 Key1 value1
1 Key2 value2
1 Key3 value3
1 Key4 value4
1 Key5 value5
1 Key6 value6
1 Key7 value7
1 Key8 value8
1 Key9 value9
1 Key10 value10
1 |
1 |
1 |
1 |
1
1 KeyN valueN
提前致谢。
解决方案
似乎UDTF是适合您的解决方案。您可以将explode视为一个 UDTF 示例,它接收一个数组,然后输出 N 行,每个元素一个。
您的 UDTF 的签名将是类似的:
@Udtf(schema = "STRUCT<key VARCHAR, value VARCHAR>")
public <T> List<Struct> expandMapEntries(final Map<String, String> input) {
// output a list of key value pairs as a struct from 'input'
}
然后您可以使用此 UDTF 并从中选择字段(如下所示):
CREATE STREAM expanded AS SELECT EXPAND_MAP_ENTRIES(KeyValues) AS keyVals FROM source;
CREATE STREAM flattened AS keyVals->key as `KEY`, keyVals->value AS VALUE FROM expanded;
让我知道这是否对您有用,并随时联系社区 slack (@almog) - 我对这个用例非常感兴趣。
推荐阅读
- reactjs - react js中的eventbrite入门
- javascript - 通过 splash 评估 javascript 时如何避免 CSP 错误?
- logging - 具有 ILoggerFactory 和 Autofac 注入服务的自定义 Serilog 丰富器
- python - Python 缩进辅助
- kubernetes-helm - 在 HELM 模板定义函数中使用变量
- firebase - 在多个屏幕中调用相同的 onAuthStateChanged 监听器会创建新的监听器?
- listview - SwipeView 仅在执行滑动后对某些行做出响应!(直到页面手动导航)
- html - 从 jQuery 和 Thymeleaf 中的输入计算值?
- snowflake-cloud-data-platform - 带有更改日志的雪花建模,仅包含新记录和已删除记录
- language-agnostic - 编程世界中“业务逻辑”这个词的词源是什么?