首页 > 解决方案 > 当物化视图中出现新的 MAX 值时,具有唯一的 Kafka ksqlDB 事件

问题描述

对 ksqlDB 事件和物化视图感到困惑。当在表格聚合/物化视图中确定出现新的高股价时,我希望在专门的主题/流中得到通知,但我收到的每个事件都报告了高价,而不是仅在出现新的高价事件时.

这是我的工作示例/设置。

为股票创建基础流和主题。

ksql> create stream stocks (symbol VARCHAR KEY, company VARCHAR, price DECIMAL(9, 2))
>  WITH (KAFKA_TOPIC='stocks', PARTITIONS=1, VALUE_FORMAT='json');

Message
----------------
Stream created
----------------

添加一些有关 Acme Corp 股票价格的初始数据

ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.11);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.12);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.13);

打印底层主题以证明数据存在。

ksql> print 'stocks' from beginning limit 3;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:26:57.169 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.11}
rowtime: 2021/03/21 14:27:01.717 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.12}
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"COMPANY":"Acme Corp","PRICE":111.13}
Topic printing ceased

创建聚合/物化视图以显示最高股票价格。

ksql> create table stock_highs as
>  select symbol, max(price) as high
>  from stocks
>  group by symbol
>  emit changes;

 Message
--------------------------------------------
 Created query with ID CTAS_STOCK_HIGHS_115
--------------------------------------------

查询它以进行目视检查。

ksql> select * from stock_highs where symbol = 'ACME';
+--------------+------------------------+
|SYMBOL        |HIGH                    |
+--------------+------------------------+
|ACME          |111.13                  |
Query terminated

在单独的终端(终端 2)中使用消费者(又名打印 STOCK_HIGHS)来观察股票高价的变化。

ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"HIGH":111.13}

回到带有 ksql 客户端的原始终端(终端 1)插入更多数据以强制更新库存高点。

ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.20);
ksql> insert into stocks (symbol, company, price) values ('ACME', 'Acme Corp', 111.15);

上面的插入应该给出 111.20 的新高价并忽略 111.15 的价格,因此,在我的思考和用例中,我希望从 STOCK_HIGHS 主题中获得一个事件/消息,显示 111.20 的新高股价,而不是 111.15价格。但是,我在消费者(终端 2)中得到了两个新事件。

ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z, key: 1094929733, value: {"HIGH":111.13}
rowtime: 2021/03/21 14:37:51.234 Z, key: 1094929733, value: {"HIGH":111.20}
rowtime: 2021/03/21 14:39:03.301 Z, key: 1094929733, value: {"HIGH":111.20}

问题是我真的只想在只有新的高价出现时得到通知或“事件”,这样当新的高价出现时,我可以让消费者离开并做一些有意义的事情。

标签: apache-kafkaksqldbevent-driven-design

解决方案


推荐阅读