apache-kafka - 数组的双重展平。Ksqldb 0.8.1
问题描述
数据。
kafkacat -b 127.0.0.1 -t group-topic -P
{"groups":[{"name":"Roberth","surname":"Smith","origin":"England","albums":["Wish","Desintegration"],"group":"The Cure"},{"name":"Peter","surname":"Murphy","origin":"England","albums":["Mask","In The Flat Field"],"group":"Bauhaus"}]};
// 结构流
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM GROUPS_01
(groups ARRAY<STRUCT<
albums ARRAY<VARCHAR>,
name VARCHAR,
surname VARCHAR
>>)
WITH (kafka_topic='group-topic', value_format='JSON');
SELECT
EXPLODE(groups)->name AS name,
EXPLODE(groups)->surname AS surname,
EXPLODE(groups)->albums AS albums
FROM GROUPS_01
EMIT CHANGES;
// 我有
NAME SURNAME ALBUMS
Roberth Smith [Wish,Desintegration]
Peter Murphy [Mask,In The Flat Field]
// 我需要
NAME SURNAME ALBUM
Roberth Smith Wish
Roberth Smith Desintegration
Peter Murphy Mask
Peter Murphy In The Flat Field
// 尝试
EXPLODE(groups)->EXPLODE(albums)->album AS album
EXPLODE(albums)->album AS album
解决方案
为清楚起见,这是您提供的源数据:
{
"groups": [
{
"name": "Roberth",
"surname": "Smith",
"origin": "England",
"albums": [
"Wish",
"Desintegration"
],
"group": "The Cure"
},
{
"name": "Peter",
"surname": "Murphy",
"origin": "England",
"albums": [
"Mask",
"In The Flat Field"
],
"group": "Bauhaus"
}
]
}
先爆出根数组
ksql> CREATE STREAM EX1A AS SELECT EXPLODE(GROUPS) AS GROUP_SINGLE FROM GROUPS_01 EMIT CHANGES;
Message
-----------------------------------
Created query with ID CSAS_EX1A_5
-----------------------------------
这给了我们:
ksql> SELECT * FROM EX1A EMIT CHANGES;
+----------------+-------+-----------------------------------------------------------+
|ROWTIME |ROWKEY |GROUP_SINGLE |
+----------------+-------+-----------------------------------------------------------+
|1585666857714 |null |{ALBUMS=[Wish, Desintegration], NAME=Roberth, SURNAME=Smith|
| | |} |
|1585666857714 |null |{ALBUMS=[Mask, In The Flat Field], NAME=Peter, SURNAME=Murp|
| | |hy} |
现在使用->
操作符访问嵌套结构并展开ALBUMS
数组:
CREATE STREAM ALBUMS_EXPLODED AS
SELECT GROUP_SINGLE->NAME AS NAME,
GROUP_SINGLE->SURNAME AS SURNAME,
EXPLODE(GROUP_SINGLE->ALBUMS) AS ALBUM
FROM EX1A
EMIT CHANGES;
ksql> SELECT NAME, SURNAME, ALBUM FROM ALBUMS_EXPLODED EMIT CHANGES;
+-------------------+----------------------+-------------------+
|NAME |SURNAME |ALBUM |
+-------------------+----------------------+-------------------+
|Roberth |Smith |Wish |
|Roberth |Smith |Desintegration |
|Peter |Murphy |Mask |
|Peter |Murphy |In The Flat Field |
推荐阅读
- jenkins - 尝试在同一台服务器上连接 Jenkins 和 Azure DevOps 时出错
- python - JSONDecodeError 期望值:第 1 行第 1 列 (char 0) 与空数组响应
- javascript - 具有某些条件的数字范围的正则表达式
- mysql - 错误代码:1265。列的数据被截断
- python - 我如何运行长度编码模式而不是字符?
- spring-kafka - 如何部署处于暂停模式的 kafka 消费者,直到我发出开始使用消息的信号
- python - Keras - 当 returnSequence 为 True 时,尺寸必须相等
- php - WordPress - 在您的自定义主题中使用 Ultimate Member 插件
- javascript - 元掩码中是否有加速事务的回调函数?
- mysql - 在一个查询中获取总行数和最后一个 id