timestamp - kSQL - 按时间戳将表分组为每分钟的字符串
问题描述
我有一个关于 kSQL 的问题。我对 kSQL 很陌生,但在使用 MS SQL Server 方面有一些经验。希望在这里得到一些帮助,因为我不明白为什么事情会发生在 KSQL 中,因为它们目前正在发生。仅供参考:我们在 Confluent (Apache Kafka) Cloud 中使用 kSQLDB。
下面我们来看我的用例:我的团队和我正在使用开源 API 使用一些加密货币数据(价格、市场份额、百分比变化等),我们希望每分钟聚合数据(尤其是价格数据和其他一些数据) .
来自 API 的消息包含以毫秒为单位的时间戳(BIGINT 格式),我们使用 KSQL 在流中将时间戳更改为字符串格式。
在我们的加密货币数据流的最后一步中,我们希望按时间戳(字符串)对 avg(priceusd) 进行分组,以获得每个时间戳的平均价格结果(显示为每分钟的日期和时间)。但是我们的 kSQLDB 中的表总是在每个时间戳(在 group by 子句中)向表生成多于一行。在流式传输实时数据时将 auto.offset.reset 设置为“最新”时尤其会发生这种情况。使用 auto.offset.reset “earliest” 从底层主题加载旧数据工作正常(每个时间戳一个条目,因为它应该通过 group by)。
在这里,我们使用包含 group by 子句的最终表的代码:
CREATE TABLE COINCAP_Table WITH (KAFKA_TOPIC=‘Coincap_Table’, KEY_FORMAT=‘JSON’, PARTITIONS=1, REPLICAS=3, VALUE_FORMAT=‘JSON’) AS SELECT
data->symbol+’,’+TIMESTAMP_FORMATTED as TIMESTAMP_SYMBOL_KEY,
AVG(data->priceusd) as AVG_priceusd,
AVG(data->volumeusd24hr) as AVG_volumeusd24hr,
AVG(data-> CHANGEPERCENT24HR) as AVG_CHANGEPERCENT24HR,
AVG(data->marketcapusd) as AVG_marketCapUsd
FROM COINCAP_STREAM2
GROUP BY data->symbol+’,’+TIMESTAMP_FORMATTED
EMIT CHANGES;
PS:我们已将列合并为一列(符号和时间戳),以便稍后使用合并属性进行连接。但这不是重点。
TIMEStAMP_FORMATTED 从 BIGINT(以毫秒为单位)更改为 STRING,如下所示:
TIMESTAMPTOSTRING(TIMESTAMP, ‘yyyy-MM-dd ‘‘at’’ HH:mm’) as TIMESTAMP_FORMATTED,
有谁知道解决此问题的解决方案,并且 group by 子句中的每个键只能获得一行?为什么 kSQL 为 group by 子句中的每个键属性生成多行(有时是 2 或 3)?
谢谢你的帮助。
最好的,塞巴斯蒂安
解决方案
推荐阅读
- visual-studio - Unity object.transform.position 返回低数字?
- sql - Packer 构建失败并出现 SQL 2014 .\SETUP.EXE 退出,错误代码为 -2068774911 Azure
- kotlin - 限制 Kotlin 中协程的数量
- twilio - 创建房间时如何在 Twillio 可编程视频 statusCallback 中指定 statusCallbackEvent?
- reactjs - 使用反应挂钩 setstate 将数组推送到状态不起作用
- java - 如何在 Java 中使用泛型扩展抽象类?
- javascript - 如何通过以下博主最近发布的 js 和 css 代码获得 First Post Is big?
- python-3.x - XV-11 激光雷达使用 Raspberry pi 3B+ 读取数据
- excel - 如何自动刷新数据透视表?
- tkinter - 你如何在 PyCharm 上安装 tkinter 我已经尝试了所有命令,但这就是我得到的|