apache-kafka - 按多个字段聚合并映射到一个结果
问题描述
对于从票务系统流出的数据,我们尝试实现以下目标
获取按状态和客户分组的未结工单数量。简化的架构如下
Field | Type
-------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ID | BIGINT
TICKET_ID | BIGINT
STATUS | VARCHAR(STRING)
TICKETCATEGORY_ID | BIGINT
SUBJECT | VARCHAR(STRING)
PRIORITY | VARCHAR(STRING)
STARTTIME | BIGINT
ENDTIME | BIGINT
CHANGETIME | BIGINT
REMINDTIME | BIGINT
DEADLINE | INTEGER
CONTACT_ID | BIGINT
我们希望使用该数据来获取每个客户具有特定状态(打开、等待、进行中等)的票数。此数据必须与另一个主题中的一条消息相关-该方案可能看起来像这样
Field | Type
-------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
CONTACT_ID | BIGINT
COUNT_OPEN | BIGINT
COUNT_WAITING | BIGINT
COUNT_CLOSED | BIGINT
我们计划使用这些数据和其他数据来丰富客户信息并将丰富的数据集发布到外部系统(例如弹性搜索)
获得第一部分非常容易 - 按客户和状态对票进行分组。
select contact_id,status count(*) cnt from tickets group by contact_id,status;
但是现在我们被困住了——我们得到每个客户的多行/消息,我们只是不知道如何将它们转换为以contact_id为键的一条消息。
我们尝试加入,但我们所有的尝试都没有结果。
例子
为按客户分组的状态为“等待”的所有工单创建表
create table waiting_tickets_by_cust with (partitions=12,value_format='AVRO')
as select contact_id, count(*) cnt from tickets where status='waiting' group by contact_id;
重新生成连接表
CREATE TABLE T_WAITING_REKEYED with WITH (KAFKA_TOPIC='WAITING_TICKETS_BY_CUST',
VALUE_FORMAT='AVRO',
KEY='contact_id');
左(外)将该表与我们的客户表连接起来,让我们所有等待门票的客户。
select c.id,w.cnt wcnt from T_WAITING_REKEYED w left join CRM_CONTACTS c on w.contact_id=c.id;
但是我们需要所有等待计数为 NULLED 的客户才能使用该结果与状态为 PROCESSING 的票证的另一个联接。由于我们只有等待的客户,因此我们只能获得对这两种状态都有价值的客户。
ksql> select c.*,t.cnt from T_PROCESSING_REKEYED t left join cust_ticket_tmp1 c on t.contact_id=c.id;
null | null | null | null | 1
1555261086669 | 1472 | 1472 | 0 | 1
1555261086669 | 1472 | 1472 | 0 | 1
null | null | null | null | 1
1555064371937 | 1474 | 1474 | 1 | 1
null | null | null | null | 1
1555064371937 | 1474 | 1474 | 1 | 1
null | null | null | null | 1
null | null | null | null | 1
null | null | null | null | 1
1555064372018 | 3 | 3 | 5 | 6
1555064372018 | 3 | 3 | 5 | 6
那么这样做的正确方法是什么?
这是 KSQL 5.2.1
谢谢
编辑:
这是一些示例数据
创建了一个将数据限制为测试帐户的主题
CREATE STREAM tickets_filtered
WITH (
PARTITIONS=12,
VALUE_FORMAT='JSON') AS
SELECT id,
contact_id,
subject,
status,
TIMESTAMPTOSTRING(changetime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring
FROM tickets where contact_id=1472
PARTITION BY contact_id;
00:06:44 1 $ kafkacat-dev -C -o beginning -t TICKETS_FILTERED
{"ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
Changing and adding something in the ticketing-system...
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}
我们想从这些数据中创建一个主题,其中消息看起来像这样
{"CONTACT_ID":1472,"TICKETS_CLOSED":1,"TICKET_WAITING":1,"TICKET_CLOSEREQUEST":1,"TICKET_PROCESSING":0}
解决方案
(这里也写了)
可以通过构建一个表(用于状态)然后在该表上聚合来做到这一点。
设置测试数据
kafkacat -b localhost -t tickets -P <<EOF {"ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"} {"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"} {"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"} {"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"} {"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"} {"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"} EOF
预览主题数据
ksql> PRINT 'tickets' FROM BEGINNING; Format:JSON {"ROWTIME":1555511270573,"ROWKEY":"null","ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"} {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"} {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"} {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"} {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"} {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}
注册流
CREATE STREAM TICKETS (ID INT, CONTACT_ID VARCHAR, SUBJECT VARCHAR, STATUS VARCHAR, TIMESTRING VARCHAR) WITH (KAFKA_TOPIC='tickets', VALUE_FORMAT='JSON');
查询数据
ksql> SET 'auto.offset.reset' = 'earliest'; ksql> SELECT * FROM TICKETS; 1555502643806 | null | 2216 | 1472 | Test Bodenbach | closed | 2012-11-08 10:34:30.000 1555502643806 | null | 8945 | 1472 | sync-test | waiting | 2019-04-16 23:07:01.000 1555502643806 | null | 8945 | 1472 | sync-test | processing | 2019-04-16 23:52:08.000 1555502643806 | null | 8945 | 1472 | sync-test | waiting | 2019-04-17 00:10:38.000 1555502643806 | null | 8952 | 1472 | another sync ticket | new | 2019-04-17 00:11:23.000 1555502643806 | null | 8952 | 1472 | another sync ticket | close-request | 2019-04-17 00:12:04.000
在这一点上,我们可以使用
CASE
枢轴聚合:SELECT CONTACT_ID, SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST , SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED FROM TICKETS GROUP BY CONTACT_ID; 1472 | 1 | 1 | 2 | 1 | 1
但是,您会注意到答案并不像预期的那样。这是因为我们正在计算所有六个输入事件。
让我们看一个单张票,
8945
ID——它经历了三个状态变化(waiting
->processing
->waiting
),每一个都包含在聚合中。我们可以用一个简单的谓词来验证这一点:SELECT CONTACT_ID, SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST , SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED FROM TICKETS WHERE ID=8945 GROUP BY CONTACT_ID; 1472 | 0 | 1 | 2 | 0 | 0
我们真正想要的是每张票的当前状态。因此重新分区票证 ID 上的数据:
CREATE STREAM TICKETS_BY_ID AS SELECT * FROM TICKETS PARTITION BY ID; CREATE TABLE TICKETS_TABLE (ID INT, CONTACT_ID INT, SUBJECT VARCHAR, STATUS VARCHAR, TIMESTRING VARCHAR) WITH (KAFKA_TOPIC='TICKETS_BY_ID', VALUE_FORMAT='JSON', KEY='ID');
比较事件流与当前状态
事件流(KSQL 流)
ksql> SELECT ID, TIMESTRING, STATUS FROM TICKETS; 2216 | 2012-11-08 10:34:30.000 | closed 8945 | 2019-04-16 23:07:01.000 | waiting 8945 | 2019-04-16 23:52:08.000 | processing 8945 | 2019-04-17 00:10:38.000 | waiting 8952 | 2019-04-17 00:11:23.000 | new 8952 | 2019-04-17 00:12:04.000 | close-request
当前状态(KSQL 表)
ksql> SELECT ID, TIMESTRING, STATUS FROM TICKETS_TABLE; 2216 | 2012-11-08 10:34:30.000 | closed 8945 | 2019-04-17 00:10:38.000 | waiting 8952 | 2019-04-17 00:12:04.000 | close-request
我们想要一个表格的聚合——我们想要运行与
SUM(CASE…)…GROUP BY
上面相同的技巧,但基于每张票的当前状态,而不是每个事件:SELECT CONTACT_ID, SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST , SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED FROM TICKETS_TABLE GROUP BY CONTACT_ID;
这给了我们想要的东西:
1472 | 0 | 0 | 1 | 1 | 1
让我们将另一个工单的事件输入到主题中,并观察表的状态如何变化。当状态改变时,表中的行被重新发出;您也可以取消
SELECT
并重新运行它以仅查看当前状态。示例数据自己尝试:
{"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"new","TIMESTRING":"2019-04-16 23:07:01.000"} {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"processing","TIMESTRING":"2019-04-16 23:07:01.000"} {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"} {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"processing","TIMESTRING":"2019-04-16 23:07:01.000"} {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"} {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"closed","TIMESTRING":"2019-04-16 23:07:01.000"} {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"close-request","TIMESTRING":"2019-04-16 23:07:01.000"}
如果你想进一步尝试这个,你可以从Mockaroo生成一个额外的虚拟数据流,通过管道awk
降低它的速度,这样你就可以看到每条消息到达时对生成的聚合的影响:
while [ 1 -eq 1 ]
do curl -s "https://api.mockaroo.com/api/f2d6c8a0?count=1000&key=ff7856d0" | \
awk '{print $0;system("sleep 2");}' | \
kafkacat -b localhost -t tickets -P
done
推荐阅读
- html - Flex 显示为按钮提供自动高度
- javascript - 在 REACT 中更改动态生成的组件的类别而不为每个组件手动创建单独的状态?
- android - 如何在 Flutter 中用 CircleAvatar 替换抽屉的汉堡包图标?
- list - Haskell 列表理解创建所有可能的值组合
- python - 熊猫从变量中读取excel
- c - 函数指针原型,它以三个整数指针作为参数并返回字符指针
- javascript - 函数的默认解构参数
- sql - SQL:选择与特定人借过相同书籍的人的姓名
- python - 带有 Tensorflow 的 Anaconda 3
- python - 将数组中的 str 转换为字节并将其保存为 CSV(在 Python 中)