apache-kafka - KSQL 流 - 从结构数组中获取数据
问题描述
我的 JSON 看起来像:
{
"Obj1": {
"a": "abc",
"b": "def",
"c": "ghi"
},
"ArrayObj": [
{
"key1": "1",
"Key2": "2",
"Key3": "3",
},
{
"key1": "4",
"Key2": "5",
"Key3": "6",
},
{
"key1": "7",
"Key2": "8",
"Key3": "9",
}
]
}
我已经编写了 KSQL 流以将其转换为 AVRO 并保存到主题,以便我可以将其推送到 JDBC Sink 连接器
CREATE STREAM Example1(ArrayObj ARRAY<STRUCT<key1 VARCHAR, Key2 VARCHAR>>,Obj1 STRUCT<a VARCHAR>)WITH(kafka_topic='sample_topic', value_format='JSON');
CREATE STREAM Example_Avro WITH(VALUE_FORMAT='avro') AS SELECT e.ArrayObj[0] FROM Example1 e;
在 Example_Avro 中,我只能获取数组中的第一个对象。
当我在 KSQL 中从 Example_Avro 中点击 select * 时,如何获得如下所示的数据?
a b key1 key2 key3
abc def 1 2 3
abc def 4 5 6
abc def 7 8 9
解决方案
测试数据(我删除了值后面的无效逗号key3
):
ksql> PRINT test4;
Format:JSON
1/9/20 7:45:18 PM UTC , NULL , { "Obj1": { "a": "abc", "b": "def", "c": "ghi" }, "ArrayObj": [ { "key1": "1", "Key2": "2", "Key3": "3" }, { "key1": "4", "Key2": "5", "Key3": "6" }, { "key1": "7", "Key2": "8", "Key3": "9" } ] }
询问:
SELECT OBJ1->A AS A,
OBJ1->B AS B,
EXPLODE(ARRAYOBJ)->KEY1 AS KEY1,
EXPLODE(ARRAYOBJ)->KEY2 AS KEY2,
EXPLODE(ARRAYOBJ)->KEY3 AS KEY3
FROM TEST4
EMIT CHANGES;
结果:
+-------+-------+------+-------+-------+
|A |B |KEY1 |KEY2 |KEY3 |
+-------+-------+------+-------+-------+
|abc |def |1 |2 |3 |
|abc |def |4 |5 |6 |
|abc |def |7 |8 |9 |
在 ksqlDB 0.6 上测试,其中EXPLODE
添加了该功能。
推荐阅读
- html - LogicException:您必须在转换之前定义一个二进制文件
- javascript - 在 react 中解析 json url 并获取元素的值
- javascript - 我不明白为什么我的 billAmount 值没有打印在 conole 上
- node.js - 运行lua时终端限制cpu使用
- docker - docker 丢弃来自同一子网的流量
- html - 单选按钮背景图像不在 chrome 中居中
- php - Google 客户端 ID 未在网络和移动设备上捕获
- xlsx - 使用 writexl 包导出包含实数和复数的数据帧
- python - SQL 说我给出了三个参数,但据我所知,我只给出了两个
- python - TkInter:两个按钮之间的传输变量