apache-kafka - Debezium MySQL 连接器可以通过事件的操作类型将数据更改事件路由到不同的主题吗?
问题描述
Debezium 支持三种类型的数据更改事件:
- 插入
- 删除
- 更新
我知道op
Debezium 发布消息的有效负载中有一个字段可以识别事件类型,但我想知道无论如何我可以通过操作类型(如 SMT)将这三种类型的数据更改事件路由到不同的 Kafka 主题?
解决方案
单消息转换
正如您所建议的,单消息转换是在这里使用的一个不错的选择。Debezium 有一个当前处于测试阶段的转换ContentBasedRouter
,您可以使用它使用包括 Groovy 在内的语言对路由进行编码。
数据库
您可以使用 ksqlDB 执行此操作:
-- Declare source topic from Debezium as ksqlDB stream
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='asgard.demo.ORDERS', VALUE_FORMAT='AVRO');
-- Create three streams (backed by Kafka topics) based on the op-type
CREATE STREAM ORDERS_UPDATES AS SELECT * FROM ORDERS WHERE OP='u';
CREATE STREAM ORDERS_DELETES AS SELECT * FROM ORDERS WHERE OP='d';
CREATE STREAM ORDERS_CREATES AS SELECT * FROM ORDERS WHERE OP='c';
查看数据
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------------
ORDERS_CREATES | 1 | 1
ORDERS_DELETES | 1 | 1
ORDERS_UPDATES | 1 | 1
检查计数
ksql> SELECT OP,COUNT(*) AS EVENTS FROM ORDERS GROUP BY OP EMIT CHANGES;
+-------+----------+
|OP |EVENTS |
+-------+----------+
|u |3 |
|c |502 |
|d |5 |
ksql> SELECT 'ORDERS_UPDATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT
FROM ORDERS_UPDATES GROUP BY 'ORDERS_UPDATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME |EVENT_COUNT |
+----------------+-------------+
|ORDERS_UPDATES |3 |
Limit Reached
Query terminated
ksql> SELECT 'ORDERS_CREATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT
FROM ORDERS_CREATES GROUP BY 'ORDERS_CREATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME |EVENT_COUNT |
+----------------+-------------+
|ORDERS_CREATES |503 |
Limit Reached
Query terminated
ksql> SELECT 'ORDERS_DELETES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT
FROM ORDERS_DELETES GROUP BY 'ORDERS_DELETES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME |EVENT_COUNT |
+----------------+-------------+
|ORDERS_DELETES |5 |
Limit Reached
Query terminated
推荐阅读
- python - 为什么第二次尝试 re.search 错误在第二次尝试中出现并在第一次尝试中起作用?
- gitlab - GitLab-CI:无法再使用 lftp 进行部署
- javascript - 使 Tilezoom 移动友好
- postgresql - Postgres:如何根据字符串生成唯一编号?
- python - 计算两个平面之间形状的体积
- javascript - 编写插入变量的 HTML 文件?
- sql - 在 Haskell 中可以调用带有表类型参数的存储过程吗?
- ruby - bash_profile 环境变量问题:ruby 版本将被初始化
- python - 如何在不使用 display.specshow 的情况下缩放 librosa 频谱图的频率轴?
- sql - 获取自动化错误 Excel VBA 尝试更新 SQL Db