apache-kafka-connect - KSql Stream-table 内部连接没有给出预期的结果
问题描述
Change Data Capture
我们正在构建一个解决方案,通过使用Debezium
, Kafka Connect
&将数据从系统 A 流式传输到系统Ksql
B。
必须满足以下要求:
- 系统 B 感兴趣的所有项目必须(最终)在系统 B 中可用
- 每当一个项目“改变”(
initial read, create, update, delete
) 时,系统 B 应该只被通知一次 - 每个项目的数据必须完整
- 我们不能更改现有 sql-schema 上的任何内容
我们遇到的问题是,当我们执行流表内连接时,总是会丢失相同数量的数据。
1.现有的sql表
create table movie (
internal_id char(11) not null primary key,
producer_id char(7) not null foreign key references producer,
movie_id char (11) not null unique constraint,
title char(50) not null,
release_date date not null,
created_timestamp datetime(6) not null,
changed_timestamp datetime(6) not null
)
create table producer (
producer_id char(7) not null primary key,
name char(50) not null,
created_timestamp datetime(6) not null,
changed_timestamp datetime(6) not null
)
2. 从包含 Debezium 数据更改事件的主题创建流
CREATE STREAM MOVIE_DBZ (internal_id string, producer_id string, movie_id string, title string, release_date bigint, created_timestamp bigint, changed_timestamp bigint, op string)
WITH (KAFKA_TOPIC='dbserver.movie', TIMESTAMP='changed_timestamp', VALUE_FORMAT='json');
CREATE STREAM PRODUCER_DBZ (producer_id string, name string, created_timestamp bigint, changed_timestamp bigint, op string)
WITH (KAFKA_TOPIC='dbserver.producer', TIMESTAMP='changed_timestamp', VALUE_FORMAT='json');
2.修剪producer_id
CREATE STREAM MOVIE_DBZ_TRIMMED
WITH (KAFKA_TOPIC='MOVIE_TRIMMED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1) AS
SELECT TRIM(producer_id) as producer_id, movie_id as movie_id, changed_timestamp as changed_timestamp
FROM MOVIE_DBZ;
CREATE STREAM PRODUCER_DBZ_TRIMMED
WITH (KAFKA_TOPIC='PRODUCER_TRIMMED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1) AS
SELECT TRIM(producer_id) as producer_id, name as name, changed_timestamp as changed_timestamp
FROM PRODUCER_DBZ ;
3.按producer_id分区
CREATE STREAM MOVIE_STREAM
WITH (KAFKA_TOPIC='MOVIE_REPARTIONED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1) AS
SELECT producer_id as producer_id, movie_id as movie_id, changed_timestamp as changed_timestamp
FROM MOVIE_DBZ_TRIMMED
PARTITION BY producer_id;
CREATE STREAM PRODUCER_STREAM
WITH (KAFKA_TOPIC='PRODUCER_REPARTIONED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1) AS
SELECT producer_id as producer_id, name as name, changed_timestamp as changed_timestamp
FROM PRODUCER_DBZ_TRIMMED
PARTITION BY producer_id;
4. 从生产者流创建表
CREATE TABLE PRODUCER_NAMES_TABLE(producer_id PRIMARY KEY, name string, changed_timestamp bigint)
WITH (KAFKA_TOPIC='PRODUCER_REPARTIONED', VALUE_FORMAT='json', TIMESTAMP='changed_timestamp', PARTITIONS=1);
5.创建流表内连接,为每部电影丰富对应的制片人名称
CREATE STREAM MOVIE_CHANGES_STREAM AS
SELECT movie.producer_id AS producer_id, movie.movie_id AS movie_id, producer.name AS name
FROM MOVIE_STREAM as movie
INNER JOIN PRODUCER_NAMES_TABLE as producer on movie.producer_id = producer.producer_id
emit changes;
当我们运行上面的流时,我们没有得到预期的结果:
movie sql table contains 16.552 rows where each row references a row in the producer table.
Due to some strange reason, we always get only 16331 results.
我们尝试过的事情:
- producer_id 可以在遗留数据库中包含空格 -> 修剪所有 producer_id
- 我们通过 producer_id 对每个流进行排序,以便在将来执行正确的连接并支持多个分区
- 将 'changed_timestamp' 设置为要使用的 TIMESTAMP,而不是默认的 ROWTIME
- 在开始流式传输之前,我们将生产者表的所有 changed_timestamp 设置为“1970-01-01 00:00:01”,以确保所有生产者行都包含比电影流项目“更早”的时间戳
我们可以做些什么不同的事情,或者我们可以做更多的事情来确保所有电影都存在于流表内部连接中?
解决方案
推荐阅读
- javascript - 连接 ECONNREFUSED AdonisJs 邮件
- hibernate - 启用二级缓存休眠
- angular - Angular 5 中的网络 EPUB 阅读器
- java - 登录后出现空指针异常......登录后需要点击一个按钮
- javascript - 三星 Tizen SmartTV 上的字幕不同步
- c++ - 从 .txt 文件中读取多行作为字符串删除空格并为输出创建新文件
- php - 使用 SQL 在 PHP 和 HTML 中运行总计
- c# - 在 C# 中运行 Powershellscript
- java - 当我单击 GEF DOT 插件的按钮“Sync Graphviz Export”时,Eclipse Oxygen 报告错误
- javascript - 如何在 NodeJS 中使用函数和对象而不需要它们