首页 > 解决方案 > KSql Stream-table 内部连接没有给出预期的结果

问题描述

Change Data Capture我们正在构建一个解决方案,通过使用Debezium, Kafka Connect&将数据从系统 A 流式传输到系统KsqlB。

必须满足以下要求:

我们遇到的问题是,当我们执行流表内连接时,总是会丢失相同数量的数据。

1.现有的sql表

create table movie (
  internal_id char(11) not null primary key,
  producer_id char(7) not null foreign key references producer,
  movie_id char (11) not null unique constraint,
  title char(50) not null,
  release_date date not null,
  created_timestamp datetime(6) not null,
  changed_timestamp datetime(6) not null
)
create table producer ( 
  producer_id char(7) not null primary key,
  name char(50) not null,
  created_timestamp datetime(6) not null,
  changed_timestamp datetime(6) not null
)

2. 从包含 Debezium 数据更改事件的主题创建流

CREATE STREAM MOVIE_DBZ (internal_id string, producer_id string, movie_id string, title string, release_date bigint, created_timestamp bigint, changed_timestamp bigint, op string) 
WITH (KAFKA_TOPIC='dbserver.movie', TIMESTAMP='changed_timestamp', VALUE_FORMAT='json');
CREATE STREAM PRODUCER_DBZ (producer_id string, name string, created_timestamp bigint, changed_timestamp bigint, op string) 
WITH (KAFKA_TOPIC='dbserver.producer', TIMESTAMP='changed_timestamp', VALUE_FORMAT='json');

2.修剪producer_id

CREATE STREAM MOVIE_DBZ_TRIMMED 
WITH (KAFKA_TOPIC='MOVIE_TRIMMED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1) AS
SELECT TRIM(producer_id) as producer_id, movie_id as movie_id, changed_timestamp as changed_timestamp 
FROM MOVIE_DBZ;
CREATE STREAM PRODUCER_DBZ_TRIMMED 
WITH (KAFKA_TOPIC='PRODUCER_TRIMMED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1) AS
SELECT TRIM(producer_id) as producer_id, name as name, changed_timestamp as changed_timestamp 
FROM PRODUCER_DBZ ;

3.按producer_id分区

CREATE STREAM MOVIE_STREAM
WITH (KAFKA_TOPIC='MOVIE_REPARTIONED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1) AS
SELECT producer_id as producer_id, movie_id as movie_id, changed_timestamp as changed_timestamp 
FROM MOVIE_DBZ_TRIMMED 
PARTITION BY producer_id;
CREATE STREAM PRODUCER_STREAM
WITH (KAFKA_TOPIC='PRODUCER_REPARTIONED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1) AS
SELECT producer_id as producer_id, name as name, changed_timestamp as changed_timestamp 
FROM PRODUCER_DBZ_TRIMMED 
PARTITION BY producer_id;

4. 从生产者流创建表

CREATE TABLE PRODUCER_NAMES_TABLE(producer_id PRIMARY KEY, name string, changed_timestamp bigint)
WITH (KAFKA_TOPIC='PRODUCER_REPARTIONED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1);

5.创建流表内连接,为每部电影丰富对应的制片人名称

CREATE STREAM MOVIE_CHANGES_STREAM AS 
SELECT movie.producer_id AS producer_id, movie.movie_id AS movie_id, producer.name AS name
FROM MOVIE_STREAM as movie
INNER JOIN PRODUCER_NAMES_TABLE as producer on movie.producer_id = producer.producer_id
emit changes;

当我们运行上面的流时,我们没有得到预期的结果:

movie sql table contains 16.552 rows where each row references a row in the producer table.
Due to some strange reason, we always get only 16331 results.

我们尝试过的事情:

我们可以做些什么不同的事情,或者我们可以做更多的事情来确保所有电影都存在于流表内部连接中?

标签: apache-kafka-connectconfluent-platformksqldbdebeziumcdc

解决方案


推荐阅读