首页 > 解决方案 > KSQL 一致性

问题描述

我正在使用 dotnet 和 ksql 进行 PoC。 https://github.com/pablocastilla/kafkiano/

总体思路是看能不能用KSQL实现业务逻辑。在示例中,我在库存中引入设备并从中下订单。该示例包括:

两个主要流:

使用这些流,我创建了两个表:

在这两个表之后,我创建了另一个表,其中包含订单和库存中产品之间的差异,只是为了知道是否还有产品。

通过加入最后一个表和订单流,我可以在处理该订单时留下库存。

我正在使用产品名称作为键来介绍事件。到目前为止,它在我的机器上运行良好,但我的问题是:

谢谢

KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');

//TABLE GROUPING BY PRODUCT
CREATE TABLE  ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;

// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from  ORDERSCREATEDSTREAM group by ProductName;

// join with the difference
CREATE TABLE StockByProductTable  AS  SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;

//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN  StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;

标签: apache-kafkaksqldb

解决方案


我复制并粘贴了从 confluent 团队获得的github 答案:

“我明白了你的问题。你的问题的最低答案是,只要你的消息在流中可用,它就会执行。

一个很好的类比是一台始终运行的机器。每当有效载荷进入内部时,它都会对其进行处理。现在它涉及到链接部分。您是否在处理后将一些有效负载插入到新的记录流中?那么是的,您可以将其称为“链接”。一旦您运行/执行 CTAS/CSAS 语句,您就会看到类似 'Table/Stream created and Running' 的内容,这正是它的意思。

你点燃了一个一直在运行的查询!”


推荐阅读