apache-kafka - KSQL 一致性
问题描述
我正在使用 dotnet 和 ksql 进行 PoC。 https://github.com/pablocastilla/kafkiano/
总体思路是看能不能用KSQL实现业务逻辑。在示例中,我在库存中引入设备并从中下订单。该示例包括:
两个主要流:
- 库存流接收添加事件到库存。
- 订单流接收产品订单。
使用这些流,我创建了两个表:
- ProductStock:它只是将产品添加到库存中
- 订单:按产品统计订单
在这两个表之后,我创建了另一个表,其中包含订单和库存中产品之间的差异,只是为了知道是否还有产品。
通过加入最后一个表和订单流,我可以在处理该订单时留下库存。
我正在使用产品名称作为键来介绍事件。到目前为止,它在我的机器上运行良好,但我的问题是:
这在大型生产环境中是否一致?我想知道当并行接收大量事件时一致性被破坏的限制。
我如何知道哪些查询在其他人之前执行?在将差异与订单流结合之前,我需要计算库存和订单之间的差异
谢谢
KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;
// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from ORDERSCREATEDSTREAM group by ProductName;
// join with the difference
CREATE TABLE StockByProductTable AS SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;
//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;
解决方案
我复制并粘贴了从 confluent 团队获得的github 答案:
“我明白了你的问题。你的问题的最低答案是,只要你的消息在流中可用,它就会执行。
一个很好的类比是一台始终运行的机器。每当有效载荷进入内部时,它都会对其进行处理。现在它涉及到链接部分。您是否在处理后将一些有效负载插入到新的记录流中?那么是的,您可以将其称为“链接”。一旦您运行/执行 CTAS/CSAS 语句,您就会看到类似 'Table/Stream created and Running' 的内容,这正是它的意思。
你点燃了一个一直在运行的查询!”
推荐阅读
- qt - 如何创建基于 QVariant 的通用模型?
- python - 内存上传文件中的django没有以开头的属性
- python-3.x - 根据 pandas 中的行名重新排序列名
- for-loop - 带有文件名配对的 For 循环
- logging - 如何编写 snmptrapd 配置文件?
- html - 分组 if 语句
- sql - MS-Access:在构建数据透视表之前总结 2 个值
- rust - 使用 wasm-bindgen 对大型 rust 对象进行 Js 绑定
- fitnesse - 在 Fitnesse 中,当我尝试运行测试时出现错误,但我的同事能够毫无问题地在相同的脚本上运行测试
- html - 如何将伪元素设置为段落的标题?