首页 > 解决方案 > kSQL - 按时间戳将表分组为每分钟的字符串

问题描述

我有一个关于 kSQL 的问题。我对 kSQL 很陌生,但在使用 MS SQL Server 方面有一些经验。希望在这里得到一些帮助,因为我不明白为什么事情会发生在 KSQL 中,因为它们目前正在发生。仅供参考:我们在 Confluent (Apache Kafka) Cloud 中使用 kSQLDB。

下面我们来看我的用例:我的团队和我正在使用开源 API 使用一些加密货币数据(价格、市场份额、百分比变化等),我们希望每分钟聚合数据(尤其是价格数据和其他一些数据) .

来自 API 的消息包含以毫秒为单位的时间戳(BIGINT 格式),我们使用 KSQL 在流中将时间戳更改为字符串格式。

在我们的加密货币数据流的最后一步中,我们希望按时间戳(字符串)对 avg(priceusd) 进行分组,以获得每个时间戳的平均价格结果(显示为每分钟的日期和时间)。但是我们的 kSQLDB 中的表总是在每个时间戳(在 group by 子句中)向表生成多于一行。在流式传输实时数据时将 auto.offset.reset 设置为“最新”时尤其会发生这种情况。使用 auto.offset.reset “earliest” 从底层主题加载旧数据工作正常(每个时间戳一个条目,因为它应该通过 group by)。

在这里,我们使用包含 group by 子句的最终表的代码:

CREATE TABLE COINCAP_Table WITH (KAFKA_TOPIC=‘Coincap_Table’, KEY_FORMAT=‘JSON’, PARTITIONS=1, REPLICAS=3, VALUE_FORMAT=‘JSON’) AS SELECT
data->symbol+’,’+TIMESTAMP_FORMATTED as TIMESTAMP_SYMBOL_KEY,
AVG(data->priceusd) as AVG_priceusd,
AVG(data->volumeusd24hr) as AVG_volumeusd24hr,
AVG(data-> CHANGEPERCENT24HR) as AVG_CHANGEPERCENT24HR,
AVG(data->marketcapusd) as AVG_marketCapUsd
FROM COINCAP_STREAM2
GROUP BY data->symbol+’,’+TIMESTAMP_FORMATTED
EMIT CHANGES;

PS:我们已将列合并为一列(符号和时间戳),以便稍后使用合并属性进行连接。但这不是重点。

TIMEStAMP_FORMATTED 从 BIGINT(以毫秒为单位)更改为 STRING,如下所示:

TIMESTAMPTOSTRING(TIMESTAMP, ‘yyyy-MM-dd ‘‘at’’ HH:mm’) as TIMESTAMP_FORMATTED,

有谁知道解决此问题的解决方案,并且 group by 子句中的每个键只能获得一行?为什么 kSQL 为 group by 子句中的每个键属性生成多行(有时是 2 或 3)?

谢谢你的帮助。

最好的,塞巴斯蒂安

标签: timestampaggregationconfluent-platformksqldbminute

解决方案


推荐阅读