apache-kafka - 如何从与 KSQL 窗口表对应的 kafka 主题获取窗口开始或结束时间?
问题描述
我想从 kafka-console-consumer 命令获取窗口开始时间戳(这里是 1530008520000)。
它适用于 KSQL:
ksql> select * from DEV_MONITOR_RULE_2557_104782_233_2_TABLE;
1530008581051 | 2557 : Window{start=1530008520000 end=-} | 2557 | 2 | 1530008581051
但它不适用于 kafka-console-consumer?
./bin/kafka-console-consumer --zookeeper 10.12.0.157:2181 --topic DEV_MONITOR_RULE_2557_104782_233_2_TABLE
{"HITCOUNTS":2,"TENANTID":2557,"HITTIME":1530008581051}
如何从 kafka-console-consumer 打印窗口开始时间(这里是 1530008520000)?
谢谢!
解决方案
窗口开始时间反映在ROWTIME
KSQL 消息和 Kafka 消息的时间戳中。您可以使用标准 API 访问此时间戳以处理 Kafka 消息。
AFAIKkafka-console-consumer
不支持显示时间戳。然而,类似的事情kafkacat
。
这是 KSQL 中的一些聚合数据,ROWTIME
显示了窗口的开头(并TIMESTAMPTOSTRING
用于漂亮地打印它):
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), ROWTIME, STARS, STAR_COUNT FROM RATINGS_AGG2;
2018-06-27 09:30:00 | 1530091800000 | 1 | 2
2018-06-27 09:30:00 | 1530091800000 | 4 | 6
2018-06-27 09:30:00 | 1530091800000 | 2 | 2
2018-06-27 09:30:00 | 1530091800000 | 3 | 3
现在相同的主题kafkacat
:
$ kafkacat -b localhost:9092 -C -K: \
-f '\nTimestamp: %T\t\tValue (%S bytes): %s' \
-t RATINGS_AGG2
Timestamp: 1530091800000 Value (26 bytes): {"STAR_COUNT":1,"STARS":1}
Timestamp: 1530091800000 Value (26 bytes): {"STAR_COUNT":2,"STARS":1}
Timestamp: 1530091800000 Value (26 bytes): {"STAR_COUNT":3,"STARS":3}
Timestamp: 1530091800000 Value (26 bytes): {"STAR_COUNT":5,"STARS":1}
Timestamp: 1530091800000 Value (26 bytes): {"STAR_COUNT":6,"STARS":3}
Timestamp: 1530091800000 Value (26 bytes): {"STAR_COUNT":7,"STARS":1}
Timestamp: 1530091800000 Value (26 bytes): {"STAR_COUNT":7,"STARS":3}
Timestamp: 1530091800000 Value (26 bytes): {"STAR_COUNT":9,"STARS":1}
Timestamp: 1530091800000 Value (26 bytes): {"STAR_COUNT":9,"STARS":3}
推荐阅读
- random-forest - Time Weighted Samples when using Random Forest
- asp.net-web-api - 我不能说我的 .net 核心项目中缺少什么……我的 Intellisense 不能正常工作还是什么原因?
- python - Update a table when other tables changed in Django ORM
- php - How to properly format an array?
- python - Speed up date columns conversion (pandas) from string to datetime
- javascript - Create a immutable copy of a property
- python - Python3 multiprocessing a 'for' loop
- java - 由于 build.gradle Android Studio 中的错误,应用程序崩溃
- github - 将史诗游戏帐户与 github 连接
- r - 安装 sparklyr 版本 0.8.4 时出错