首页 > 解决方案 > KSQL 流 - 从结构数组中获取数据

问题描述

我的 JSON 看起来像:

{
  "Obj1": {
    "a": "abc",
    "b": "def",
    "c": "ghi"
  },
  "ArrayObj": [
    {
      "key1": "1",
      "Key2": "2",
      "Key3": "3",

    },
    {
      "key1": "4",
      "Key2": "5",
      "Key3": "6",

    },
    {
      "key1": "7",
      "Key2": "8",
      "Key3": "9",

    }
  ]

}

我已经编写了 KSQL 流以将其转换为 AVRO 并保存到主题,以便我可以将其推送到 JDBC Sink 连接器

CREATE STREAM Example1(ArrayObj ARRAY<STRUCT<key1 VARCHAR, Key2 VARCHAR>>,Obj1 STRUCT<a VARCHAR>)WITH(kafka_topic='sample_topic', value_format='JSON');
CREATE STREAM Example_Avro WITH(VALUE_FORMAT='avro') AS SELECT e.ArrayObj[0] FROM Example1 e; 

在 Example_Avro 中,我只能获取数组中的第一个对象。

当我在 KSQL 中从 Example_Avro 中点击 select * 时,如何获得如下所示的数据?

  a    b   key1   key2  key3

  abc  def   1       2     3
  abc  def   4       5     6
  abc  def   7       8     9

标签: apache-kafkaksqldbconfluent-platform

解决方案


测试数据(我删除了值后面的无效逗号key3):

ksql> PRINT test4;
Format:JSON
1/9/20 7:45:18 PM UTC , NULL , { "Obj1": { "a": "abc", "b": "def", "c": "ghi" }, "ArrayObj": [ { "key1": "1", "Key2": "2", "Key3": "3" }, { "key1": "4", "Key2": "5", "Key3": "6" }, { "key1": "7", "Key2": "8", "Key3": "9" } ] }

询问:

SELECT OBJ1->A AS A, 
       OBJ1->B AS B, 
       EXPLODE(ARRAYOBJ)->KEY1 AS KEY1,
       EXPLODE(ARRAYOBJ)->KEY2 AS KEY2, 
       EXPLODE(ARRAYOBJ)->KEY3 AS KEY3 
FROM   TEST4 
EMIT CHANGES;

结果:

+-------+-------+------+-------+-------+
|A      |B      |KEY1  |KEY2   |KEY3   |
+-------+-------+------+-------+-------+
|abc    |def    |1     |2      |3      |
|abc    |def    |4     |5      |6      |
|abc    |def    |7     |8      |9      |

在 ksqlDB 0.6 上测试,其中EXPLODE添加了该功能。


推荐阅读