apache-kafka - 如何在 KSQLDB 的输出主题上构造嵌套的 JSON 消息
问题描述
从其中一个源系统中,我收到了以下事件有效负载
为以下 json 有效负载创建 Stream1
事件 JSON 1
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"40000",
"apr":"2.6%",
"minpayment":"400",
"status":"Approved"
}
}
}
事件 JSON 2
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"70000",
"apr":"3.6%",
"minpayment":"600",
"status":"Rejected"
}
}
}
我在上面的 stream1 上创建了聚合表
CREATE TABLE EVENT_TABLE AS
SELECT
avg(minpayment) as Avg_MinPayment,
avg(apr) AS Avg_APr,
avg(offeramount) AS Avgofferamount ,
status
FROM STREAM1
GROUP BY status
EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
我从 KTable 和 KTable Topic json 得到了上面的结果,看起来像这样
聚合 JSON1
打印“事件表”;
{
"Status" : "Approved",
"Avg_Minpayment" : "400",
"Avg_APr" : "2.6%",
"offeramount" : "40000"
}
聚合 JSON2
{
"Status" : "Rejected",
"Avg_Minpayment" : "600",
"Avg_APr" : "3.6%",
"offeramount" : "70000"
}
但我必须在输出主题上构建并发布最终目标 json,如下 json 格式。我必须将标题和正文添加到聚合 json1 和聚合 json2。
{
"event":{
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
解决方案
鉴于您的示例 SQL 不会产生示例输出,给定示例输入,因此您要实现的目标并不十分清楚。事实上,您的示例 SQL 会因未知列错误而失败。
类似以下内容会生成您的示例输出:
CREATE TABLE EVENT_TABLE AS
SELECT
status,
avg(eventDetails->minpayment) as Avg_MinPayment,
avg(eventDetails->apr) AS Avg_APr,
avg(eventDetails->offeramount) AS Avgofferamount
FROM STREAM1
GROUP BY status
EMIT CHANGES;
接下来,您的示例输出...
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
...每个状态输出一行。然而,你说你想要实现的输出......
{
"event":{
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
...包含两种状态,即将您的两个示例输入消息组合成一个输出。
如果我对您的理解正确,并且您确实想输出上述 JSON,那么:
您首先需要包含这些event
信息。但是哪些事件信息?如果您知道它们总是相同的,那么您可以使用:
CREATE TABLE EVENT_TABLE AS
SELECT
status,
latest_by_offset(event) as event,
avg(eventDetails->minpayment) as Avg_MinPayment,
avg(eventDetails->apr) AS Avg_APr,
avg(eventDetails->offeramount) AS Avgofferamount
FROM STREAM1
GROUP BY status
EMIT CHANGES;
聚合函数将从它看到的最后一条消息中捕获latest_by_offset
信息。event
虽然我不相信这是你想要的。您不能收到其他信息不同rejected
的消息吗?如果它是标识哪些消息应该组合在一起的信息,那么这样的事情可能会给你一些接近你想要的东西:accepted
event
event
CREATE TABLE EVENT_TABLE AS
SELECT
event,
collect_list(eventDetails) as body
FROM STREAM1
GROUP BY event
EMIT CHANGES;
如果这很接近,那么您可能需要使用STRUCT
构造函数和AS_VALUE
函数来重构您的输出。例如:
CREATE TABLE EVENT_TABLE AS
SELECT
event as key,
AS_VALUE(event) as event,
STRUCT(
keys := collect_list(eventDetails)
) as body
FROM STREAM1
GROUP BY event
EMIT CHANGES;
推荐阅读
- python - 如何将 tkinter 窗口中的项目打印到外部文件上?
- javascript - React 中的 CKEditor 5 - 以 HTML 格式插入和提取文本
- python - 为类的每个实例创建一个单例
- javascript - 如何在 treeMapper GoJs 图中添加 Pallette
- java - 递归解析路径
- javascript - 从 PagedListPager 调用 JavaScript 函数不起作用
- flutter - 是否可以在 CI/CD 工具或远程服务器上自动化 Flutter Gherkin 测试自动化?
- python - python 2.7 django中枚举字段类型的模型类
- r - 区分缺失值类型(无响应与跳过模式)
- scala - 我有一个 StructType 架构。我想以相同的格式将它存储在一个单独的文件中,并从该文件在我的 Spark 程序中读取它