apache-kafka - 当物化视图中出现新的 MAX 值时,具有唯一的 Kafka ksqlDB 事件
问题描述
对 ksqlDB 事件和物化视图感到困惑。当在表格聚合/物化视图中确定出现新的高股价时,我希望在专门的主题/流中得到通知,但我收到的每个事件都报告了高价,而不是仅在出现新的高价事件时.
这是我的工作示例/设置。
为股票创建基础流和主题。
ksql> create stream stocks (symbol VARCHAR KEY, company VARCHAR, price DECIMAL(9, 2))
> WITH (KAFKA_TOPIC='stocks', PARTITIONS=1, VALUE_FORMAT='json');
Message
----------------
Stream created
----------------
添加一些有关 Acme Corp 股票价格的初始数据
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.11);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.12);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.13);
打印底层主题以证明数据存在。
ksql> print 'stocks' from beginning limit 3;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:26:57.169 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.11}
rowtime: 2021/03/21 14:27:01.717 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.12}
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.13}
Topic printing ceased
创建聚合/物化视图以显示最高股票价格。
ksql> create table stock_highs as
> select symbol, max(price) as high
> from stocks
> group by symbol
> emit changes;
Message
--------------------------------------------
Created query with ID CTAS_STOCK_HIGHS_115
--------------------------------------------
查询它以进行目视检查。
ksql> select * from stock_highs where symbol = 'ACME';
+--------------+------------------------+
|SYMBOL |HIGH |
+--------------+------------------------+
|ACME |111.13 |
Query terminated
在单独的终端(终端 2)中使用消费者(又名打印 STOCK_HIGHS)来观察股票高价的变化。
ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"HIGH":111.13}
回到带有 ksql 客户端的原始终端(终端 1)插入更多数据以强制更新库存高点。
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.20);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.15);
上面的插入应该给出 111.20 的新高价并忽略 111.15 的价格,因此,在我的思考和用例中,我希望从 STOCK_HIGHS 主题中获得一个事件/消息,显示 111.20 的新高股价,而不是 111.15价格。但是,我在消费者(终端 2)中得到了两个新事件。
ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"HIGH":111.13}
rowtime: 2021/03/21 14:37:51.234 Z, key: 1094929733, value: {"HIGH":111.20}
rowtime: 2021/03/21 14:39:03.301 Z, key: 1094929733, value: {"HIGH":111.20}
问题是我真的只想在只有新的高价出现时得到通知或“事件”,这样当新的高价出现时,我可以让消费者离开并做一些有意义的事情。
解决方案
推荐阅读
- kubernetes - gcloud deployment: unable to change dnsconfig (DNSConfig: custom pod DNS is disabled by feature gate)
- twitter - 从数据框中提取包含关键字的行(RStudio 中的 Twitter 数据)
- macros - 在 Google 跟踪代码管理器 (GTM) 数据层中插入唯一代码宏
- android - 为每个应用程序安装生成唯一的加密密钥或盐
- ios - 我可以安全地更改应用的 PRODUCT_NAME 吗?
- react-native - React-Native,如何在警报或 actionsheetIOS 顶部添加视图?
- r - 跨两列创建组键
- firefox - 简单的聚合物示例不适用于 Firefox ESR 52.8.0
- mongoose - 如何真正设置猫鼬默认值,尤其是。布尔值?
- android - dexguard 库无法加密类