首页 > 解决方案 > 如何在 KSQLDB 的输出主题上构造嵌套的 JSON 消息

问题描述

从其中一个源系统中,我收到了以下事件有效负载

为以下 json 有效负载创建 Stream1

事件 JSON 1

{
 "event": {
  "header": {
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{"customerIdentifiers":[
   {"customerIdentifier":"1234","customerIdType":"cc"},
   {"customerIdentifier":"234","customerIdType":"id"}
  ],
  "accountIdentifiers":[
   {"accountIdentifier":"123","accountIdType":"no"},
   {"accountIdentifier":"Primary","accountIdType":"da"}
  ],
  "eventDetails":{
   "offeramount":"40000",
   "apr":"2.6%",
   "minpayment":"400",
   "status":"Approved"
  }
 }
}

事件 JSON 2

{
 "event": {
  "header": {
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{"customerIdentifiers":[
   {"customerIdentifier":"1234","customerIdType":"cc"},
   {"customerIdentifier":"234","customerIdType":"id"}
  ],
  "accountIdentifiers":[
   {"accountIdentifier":"123","accountIdType":"no"},
   {"accountIdentifier":"Primary","accountIdType":"da"}
  ],
  "eventDetails":{
   "offeramount":"70000",
   "apr":"3.6%",
   "minpayment":"600",
   "status":"Rejected"
  }
 }
}

我在上面的 stream1 上创建了聚合表

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    avg(minpayment) as Avg_MinPayment, 
    avg(apr) AS Avg_APr, 
    avg(offeramount) AS Avgofferamount , 
    status 
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount 
-----------------------------------------
Approved | 400 | 2.6% | 40000

Rejected | 600 | 3.6% | 70000

我从 KTable 和 KTable Topic json 得到了上面的结果,看起来像这样

聚合 JSON1

打印“事件表”;

{
  "Status" : "Approved", 
  "Avg_Minpayment" : "400", 
  "Avg_APr" : "2.6%", 
  "offeramount" : "40000"
}

聚合 JSON2

{
  "Status" : "Rejected", 
  "Avg_Minpayment" : "600", 
  "Avg_APr" : "3.6%", 
  "offeramount" : "70000"
}

但我必须在输出主题上构建并发布最终目标 json,如下 json 格式。我必须将标题和正文添加到聚合 json1 和聚合 json2。

{
 "event":{
  "header":{
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{
   "Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
  ]
 }
}

标签: apache-kafkaksqldb

解决方案


鉴于您的示例 SQL 不会产生示例输出,给定示例输入,因此您要实现的目标并不十分清楚。事实上,您的示例 SQL 会因未知列错误而失败。

类似以下内容生成您的示例输出:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    status,
    avg(eventDetails->minpayment) as Avg_MinPayment, 
    avg(eventDetails->apr) AS Avg_APr, 
    avg(eventDetails->offeramount) AS Avgofferamount
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;

接下来,您的示例输出...

Status | Avg_MinPayment | Avg_APr | Avgofferamount 
-----------------------------------------
Approved | 400 | 2.6% | 40000

Rejected | 600 | 3.6% | 70000

...每个状态输出一行。然而,你说你想要实现的输出......

{
 "event":{
  "header":{
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{
   "Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
  ]
 }
}

...包含两种状态,即将您的两个示例输入消息组合成一个输出。

如果我对您的理解正确,并且您确实想输出上述 JSON,那么:

您首先需要包含这些event信息。但是哪些事件信息?如果您知道它们总是相同的,那么您可以使用:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    status,
    latest_by_offset(event) as event,
    avg(eventDetails->minpayment) as Avg_MinPayment, 
    avg(eventDetails->apr) AS Avg_APr, 
    avg(eventDetails->offeramount) AS Avgofferamount
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;

聚合函数将从它看到的最后一条消息中捕获latest_by_offset信息。event虽然我不相信这是你想要的。您不能收到其他信息不同rejected的消息吗?如果它是标识哪些消息应该组合在一起的信息,那么这样的事情可能会给你一些接近你想要的东西:accepted eventevent

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    event,
    collect_list(eventDetails) as body
  FROM STREAM1 
  GROUP BY event 
  EMIT CHANGES;

如果这很接近,那么您可能需要使用STRUCT构造函数和AS_VALUE函数来重构您的输出。例如:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    event as key,
    AS_VALUE(event) as event,
    STRUCT(
      keys := collect_list(eventDetails)
    ) as body
  FROM STREAM1 
  GROUP BY event 
  EMIT CHANGES;

推荐阅读