首页 > 解决方案 > KTABLE 未检测到相同的键。(插入记录而不是更新)

问题描述

用例

目标是识别传入事件/行以检查它是新行还是更新。新行将转到不同的主题,更新行将转到不同的主题。

方法:有一个查找表(KTABLE)并做两个连接操作 1. 内连接来检测更新。2.当右表键为空时左连接检测插入/新行。根据上述两个操作的结果创建两个流。对流运行插入查询,这会将记录插入到查找表中。

复制问题的步骤如下:(需要 7 分钟)

步骤 1docker-compose up 带有 docker 的最新 Confluent 5.1.0 平台

步骤 2docker ps

注意:确保代理已启动。代理在我的本地经常出现故障。

步骤 3

在新终端中进入模式注册表的 bash。(如果您保持此终端打开,则很容易监控。)

docker run -it --net=cp-all-in-one_default --rm confluentinc/cp-schema-registry:5.1.0 bash

第 4 步

创建一个查找表。带有主题 LOAD.TEST.LOCAL.LOOKUP.TABLE。我的架构具有字符串类型的键。下面是三个示例记录。您首先使用初始 3 个虚拟记录填充查找表。

kafka-avro-console-producer --broker-list broker:9092 --topic LOAD.TEST.LOCAL.LOOKUP.TABLE \
    --property schema.registry.url=http://schema-registry:8081 \
    --property parse.key=true \
    --property key.separator=, \
    --property key.schema='{"type":"string"}' \
    --property value.schema='{"name":"LOAD.TEST.LOCAL.LOOKUP.TABLE","type":"record","namespace":"example.sender.batch","fields":[{"name":"SENDER_CODE","type":"string"},{"name":"SENDER_NAME","type":"string"},{"name":"SENDER_CATEGORY_CODE","type":"string"},{"name":"SENDER_AGENCY_CODE","type":"string"},{"name":"SENDER_SUB_AGENCY_CODE","type":"string"},{"name":"SENDER_FOREIGN_IND","type":"string"},{"name":"SENDER_FOREIGN_COUNTRY","type":"string"},{"name":"SENDER_NAME_ALTERNATE","type":"string"},{"name":"PARENT_SENDER_CODE","type":"string"},{"name":"CHANGE_DATE","type":"string"},{"name":"REQUESTING_LOCATION","type":"string"},{"name":"REQUEST_DATE","type":"string"},{"name":"REPLACEMENT_SENDER_CODE","type":"string"},{"name":"SENDER_STATUS","type":"string"},{"name":"SENDER_DUNS","type":"string"},{"name":"ADDRESSLINE1","type":"string"},{"name":"ADDRESSLINE2","type":"string"},{"name":"ADDRESSLINE3","type":"string"},{"name":"ADDRESS4","type":"string"},{"name":"CITY","type":"string"},{"name":"STATE","type":"string"},{"name":"POSTAL_CODE","type":"string"},{"name":"URL","type":"string"},{"name":"SENDER_ACRONYM","type":"string"},{"name":"DEACTIVATED_DATE","type":"string"},{"name":"Kafka_TimeEvent","type":"string"}]}'

现在您可以在下面插入记录。只需粘贴下面的 3 条记录。如果您多次按返回并出现异常,只需再次运行上述相同的命令并在按return一次后插入这些命令。

"SVI6FQ",{"SENDER_CODE":"SVI6FQ","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FR",{"SENDER_CODE":"SVI6FR","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"374 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 973","ADDRESSLINE3":"MAILBOXC","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FN",{"SENDER_CODE":"SVI6FN","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"372 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXA","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"} 

⌘+c退出。

步骤 5。在不同的终端上,打开 KSQL CLI

docker run --network cp-all-in-one_default --interactive --tty --rm confluentinc/cp-ksql-cli:latest http://ksql-server:8088

步骤 6。创建一个 KTABLE。

create table load_test_local_lookup_table with (KAFKA_TOPIC='LOAD.TEST.LOCAL.LOOKUP.TABLE',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');

步骤 7。确保设置以下属性,以便您可以看到从开始偏移量的结果。在 KSQL 中运行它。

ksql>SET 'auto.offset.reset'='earliest';

您将看到以下消息。 Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

步骤 8。现在创建一个主题,您的事件将在其中流式传输。使用步骤 4 的模式注册表的 bash。此外,将相同的记录填充到基础主题。

 kafka-avro-console-producer --broker-list broker:9092 --topic LOAD.TEST.LOCAL.EVENT.STREAM \
    --property schema.registry.url=http://schema-registry:8081 \
    --property parse.key=true \
    --property key.separator=, \
    --property key.schema='{"type":"string"}' \
    --property value.schema='{"name":"LOAD.TEST.LOCAL.EVENT.STREAM","type":"record","namespace":"example.sender.batch","fields":[{"name":"SENDER_CODE","type":"string"},{"name":"SENDER_NAME","type":"string"},{"name":"SENDER_CATEGORY_CODE","type":"string"},{"name":"SENDER_AGENCY_CODE","type":"string"},{"name":"SENDER_SUB_AGENCY_CODE","type":"string"},{"name":"SENDER_FOREIGN_IND","type":"string"},{"name":"SENDER_FOREIGN_COUNTRY","type":"string"},{"name":"SENDER_NAME_ALTERNATE","type":"string"},{"name":"PARENT_SENDER_CODE","type":"string"},{"name":"CHANGE_DATE","type":"string"},{"name":"REQUESTING_LOCATION","type":"string"},{"name":"REQUEST_DATE","type":"string"},{"name":"REPLACEMENT_SENDER_CODE","type":"string"},{"name":"SENDER_STATUS","type":"string"},{"name":"SENDER_DUNS","type":"string"},{"name":"ADDRESSLINE1","type":"string"},{"name":"ADDRESSLINE2","type":"string"},{"name":"ADDRESSLINE3","type":"string"},{"name":"ADDRESS4","type":"string"},{"name":"CITY","type":"string"},{"name":"STATE","type":"string"},{"name":"POSTAL_CODE","type":"string"},{"name":"URL","type":"string"},{"name":"SENDER_ACRONYM","type":"string"},{"name":"DEACTIVATED_DATE","type":"string"},{"name":"Kafka_TimeEvent","type":"string"}]}'

"SVI6FQ",{"SENDER_CODE":"SVI6FQ","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FR",{"SENDER_CODE":"SVI6FR","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"374 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 973","ADDRESSLINE3":"MAILBOXC","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FN",{"SENDER_CODE":"SVI6FN","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"372 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXA","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}

步骤 9

为此事件主题创建流。

create stream load_test_local_event_stream with (KAFKA_TOPIC='LOAD.TEST.LOCAL.EVENT.STREAM',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');

步骤 10

在 Stream-Table Join 之后派生一个流,用于检测已经存在的行。我们称之为 update_stream。为下面的这个流创建的主题,只会有更新。这是我的用例之一。我必须过滤更新的消息。

create stream load_test_update_stream as select event.*  FROM load_test_local_event_stream event JOIN  load_test_local_lookup_table lookup ON event.sender_code = lookup.sender_Code;

步骤 11

使用用于查找表的主题创建一个流。所以如果你想更新查找表,你可以插入到这个流中。(如果我没记错的话:您不能直接从流中插入 ktable。)。所以这样做。

create stream load_test_lookup_feed_stream with (KAFKA_TOPIC='LOAD.TEST.LOCAL.LOOKUP.TABLE',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');

步骤 12。运行插入查询。此查询将插入到查找表提要流中,当 update_stream 中有消息可用时,它将更新查找表。

Insert into load_test_lookup_feed_stream  select EVENT_SENDER_CODE AS SENDER_CODE, EVENT_SENDER_NAME AS SENDER_NAME, EVENT_SENDER_CATEGORY_CODE AS SENDER_CATEGORY_CODE , EVENT_SENDER_AGENCY_CODE AS SENDER_AGENCY_CODE , EVENT_SENDER_SUB_AGENCY_CODE AS SENDER_SUB_AGENCY_CODE, EVENT_SENDER_FOREIGN_IND AS SENDER_FOREIGN_IND, EVENT_SENDER_FOREIGN_COUNTRY AS SENDER_FOREIGN_COUNTRY  , EVENT_SENDER_NAME_ALTERNATE AS SENDER_NAME_ALTERNATE, EVENT_PARENT_SENDER_CODE AS PARENT_SENDER_CODE ,EVENT_CHANGE_DATE AS CHANGE_DATE, EVENT_REQUESTING_LOCATION AS REQUESTING_LOCATION , EVENT_REQUEST_DATE AS REQUEST_DATE, EVENT_REPLACEMENT_SENDER_CODE AS REPLACEMENT_SENDER_CODE  , EVENT_SENDER_STATUS AS SENDER_STATUS, EVENT_SENDER_DUNS AS SENDER_DUNS , EVENT_ADDRESSLINE1 AS ADDRESSLINE1 , EVENT_ADDRESSLINE2 AS ADDRESSLINE2, EVENT_ADDRESSLINE3 AS ADDRESSLINE3 , EVENT_ADDRESS4 AS ADDRESS4  , EVENT_CITY AS CITY , EVENT_STATE AS STATE, EVENT_POSTAL_CODE AS POSTAL_CODE, EVENT_URL AS URL, EVENT_SENDER_ACRONYM AS SENDER_ACRONYM , EVENT_DEACTIVATED_DATE AS DEACTIVATED_DATE, EVENT_KAFKA_TIMEEVENT AS KAFKA_TIMEEVENT from load_test_update_stream partition by SENDER_CODE ;

:exclamation: 问题:虽然这更新了我的查找表,但它正在更新为新记录。不作为更新。复制此问题(按照步骤 15A)。

步骤 13。与更新非常相似,创建一个流来检测事件中的新记录。

create stream load_test_insert_stream as select event.*  FROM load_test_local_event_stream event left JOIN  load_test_local_lookup_table lookup ON event.sender_code = lookup.sender_Code where lookup.sender_Code is null ;

验证:您可以选择仅运行选择查询以了解正在发生的事情。如果您的架构注册表 bash 已打开,请使用您自己的密钥插入一条新记录。(尝试按照第 15A 节插入一条新记录)。此新消息将在此流中可用。

步骤 14。像以前一样创建插入查询。这是插入回查找表。现在,您的查找表中填充了一条新消息。

Insert into load_test_lookup_feed_stream select EVENT_SENDER_CODE AS SENDER_CODE, EVENT_SENDER_NAME AS SENDER_NAME, EVENT_SENDER_CATEGORY_CODE AS SENDER_CATEGORY_CODE , EVENT_SENDER_AGENCY_CODE AS SENDER_AGENCY_CODE , EVENT_SENDER_SUB_AGENCY_CODE AS SENDER_SUB_AGENCY_CODE, EVENT_SENDER_FOREIGN_IND AS SENDER_FOREIGN_IND, EVENT_SENDER_FOREIGN_COUNTRY AS SENDER_FOREIGN_COUNTRY  , EVENT_SENDER_NAME_ALTERNATE AS SENDER_NAME_ALTERNATE, EVENT_PARENT_SENDER_CODE AS PARENT_SENDER_CODE ,EVENT_CHANGE_DATE AS CHANGE_DATE, EVENT_REQUESTING_LOCATION AS REQUESTING_LOCATION , EVENT_REQUEST_DATE AS REQUEST_DATE, EVENT_REPLACEMENT_SENDER_CODE AS REPLACEMENT_SENDER_CODE  , EVENT_SENDER_STATUS AS SENDER_STATUS, EVENT_SENDER_DUNS AS SENDER_DUNS , EVENT_ADDRESSLINE1 AS ADDRESSLINE1 , EVENT_ADDRESSLINE2 AS ADDRESSLINE2, EVENT_ADDRESSLINE3 AS ADDRESSLINE3 , EVENT_ADDRESS4 AS ADDRESS4  , EVENT_CITY AS CITY , EVENT_STATE AS STATE, EVENT_POSTAL_CODE AS POSTAL_CODE, EVENT_URL AS URL, EVENT_SENDER_ACRONYM AS SENDER_ACRONYM , EVENT_DEACTIVATED_DATE AS DEACTIVATED_DATE, EVENT_KAFKA_TIMEEVENT AS KAFKA_TIMEEVENT from load_test_insert_stream partition by SENDER_CODE ;

步骤 15

问题是什么:如何复制。

步骤15A如何插入新的样本记录

运行步骤 8 中的命令(使用模式)。插入/粘贴新记录,如下所示。请注意,我已经更改了消息键和发件人代码。始终您的消息键和行键应该匹配。 例如 :"SVI6FW","SENDER_CODE":"SVI6FW

"SVI6FW",{"SENDER_CODE":"SVI6FW","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}

步骤15B如何更新样本记录

与之前的 (15A) 新记录插入非常相似,但使用相同的消息键,只是更新名称或一些值。例如,'SAM II'成为'SAM III'

"SVI6FW",{"SENDER_CODE":"SVI6FW","SENDER_NAME":"SENDER SAM III","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}

问题 如果您看到,我的查找表没有得到更新,它会将每条消息视为一条新消息,即使它是使用相同的密钥发送的。因此,我无法检测到更新。每条消息都是新消息。

您可以通过执行以下操作进行测试。

  1. 使用您自己的密钥 (15A) 发送新消息。它将在 load_test_insert_stream 中可用。
  2. 使用与 15B 相同的密钥发送更新的消息。它应该在 load_test_update_stream 中可用,但它将在 load_test_insert_stream 中使用。并且查找表将其视为新消息。

欢迎任何新的方法/建议!

标签: apache-kafkaconfluent-schema-registryksqldb

解决方案


我假设您的设计基本上接近:

-- stream of inputs:
CREATE STREAM INPUT (ID INT KEY, V0 INT) WITH (kafka_topic='test_topic', value_format='JSON', PARTITIONS=1);

-- table built from the stream of inserts:
CREATE TABLE EXISTING (ID INT PRIMARY KEY, IGNORED INT) WITH (kafka_topic='INSERTS', value_format='JSON', PARTITIONS=1);

-- stream of inserts:
CREATE STREAM INSERTS AS SELECT INPUT.ID, INPUT.V0 AS V0 FROM INPUT LEFT JOIN EXISTING ON INPUT.ID = EXISTING.ID WHERE EXISTING.ID IS NULL;

-- stream of updates:
CREATE STREAM UPDATES AS SELECT INPUT.ID, INPUT.V0 AS V0 FROM INPUT JOIN EXISTING ON INPUT.ID = EXISTING.ID;

然后你要插入一些记录:

INSERT INTO INPUT VALUES (1, 3);
INSERT INTO INPUT VALUES (2, 4);
INSERT INTO INPUT VALUES (1, 5);

并期望前两行输出在INSERTS流中,最后一行输出OUTPUTS

我已经在 ksqlDB 0.11 版上测试了上述内容,它确实有效.... ish。

如果您从 CLI 中一一插入每条记录,则输出与您预期的一样。但是,如果您将所有三行插入在一起,例如通过在 CLI 中的同一行上运行它们:

INSERT INTO INPUT VALUES (1, 3);INSERT INTO INPUT VALUES (2, 4);INSERT INTO INPUT VALUES (1, 5);

然后所有三行都在INSERTS流中结束。为什么?你可能会问。

tl;博士; 解决方案很脆弱。如果更新接近插入,它将不起作用。

设计中有竞争条件。如果连接在第一行输出产生到主题之前处理第二个输入行并且连接已经轮询读取该行,那么表将不包含该行,因此第二行将错误地发送到而不是`更新。INSERTSEXISTINGINSERTS

您可以使用一些配置来查看是否可以使其适用于您的用例。

  • 设置max.task.idle.ms得更高将意味着联接将等待更长的时间以使数据出现在联接的表端。但是,如果更新和插入发生在同一毫秒内,这将无济于事,并且增加它会损害吞吐量和延迟。
  • 设置 cache.max.bytes.buffering为零将关闭 Streams 库中的缓冲,这可能会有所帮助。
  • 设置 linger.ms为零将意味着 Kafka 生产者不会延迟发送消息。

即使有了所有这些,系统也是异步的,您的结果可能会有所不同。如果更新永远不会在插入附近发生,那么系统将工作。但是,如果更新接近于插入,您可能会发现这些被错误地归类为插入。


推荐阅读