首页 > 解决方案 > 如何在 ksql 中解析 Tumbling json 密钥格式

问题描述

我正在使用 ksql 创建一个翻转窗口表,如下所示:

CREATE TABLE payments_min_acc_dev_agg AS SELECT accountId, deviceId, entity, COUNT(*) AS trans_count, sum(amount) AS trans_sum, 'Minute' AS window_type FROM payments_account_stream WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY accountId, deviceId, entity;

生成到主题的示例输出是:

ksql> print PAYMENTS_MIN_ACC_DEV_AGG from beginning limit 10;
Key format: HOPPING(JSON) or TUMBLING(JSON) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING)
Value format: JSON or KAFKA_STRING
rowtime: 2021/11/05 05:41:32.645 Z, key: [{"ACCOUNTID":"account1","DEVICEID":"device27","ENTITY":"Account"}@1636090860000/-], value: {"TRANS_COUNT":1,"TRANS_SUM":0.9183311577480634,"WINDOW_TYPE":"Minute"}, partition: 0
rowtime: 2021/11/05 05:41:33.248 Z, key: [{"ACCOUNTID":"account5","DEVICEID":"device3","ENTITY":"Account"}@1636090860000/-], value: {"TRANS_COUNT":1,"TRANS_SUM":0.14000975612417166,"WINDOW_TYPE":"Minute"}, partition: 0

我想设置一个消费者,它将使用keyvalue进行一些处理。我知道value可以反序列化为 json 字符串,但不确定如何反序列化key

标签: jsonapache-kafkaksqldb

解决方案


推荐阅读