首页 > 解决方案 > Debezium MySQL 连接器可以通过事件的操作类型将数据更改事件路由到不同的主题吗?

问题描述

Debezium 支持三种类型的数据更改事件:

我知道opDebezium 发布消息的有效负载中有一个字段可以识别事件类型,但我想知道无论如何我可以通过操作类型(如 SMT)将这三种类型的数据更改事件路由到不同的 Kafka 主题?

标签: apache-kafkaapache-kafka-connectdebezium

解决方案


单消息转换

正如您所建议的,单消息转换是在这里使用的一个不错的选择。Debezium 有一个当前处于测试阶段的转换ContentBasedRouter,您可以使用它使用包括 Groovy 在内的语言对路由进行编码。

数据库

您可以使用 ksqlDB 执行此操作:

-- Declare source topic from Debezium as ksqlDB stream
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='asgard.demo.ORDERS', VALUE_FORMAT='AVRO');

-- Create three streams (backed by Kafka topics) based on the op-type
CREATE STREAM ORDERS_UPDATES AS SELECT * FROM ORDERS WHERE OP='u';
CREATE STREAM ORDERS_DELETES AS SELECT * FROM ORDERS WHERE OP='d';
CREATE STREAM ORDERS_CREATES AS SELECT * FROM ORDERS WHERE OP='c';

查看数据

ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 ORDERS_CREATES                        | 1          | 1
 ORDERS_DELETES                        | 1          | 1
 ORDERS_UPDATES                        | 1          | 1

检查计数

ksql> SELECT OP,COUNT(*) AS EVENTS FROM ORDERS GROUP BY OP EMIT CHANGES;
+-------+----------+
|OP     |EVENTS    |
+-------+----------+
|u      |3         |
|c      |502       |
|d      |5         |

ksql> SELECT 'ORDERS_UPDATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_UPDATES GROUP BY 'ORDERS_UPDATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_UPDATES  |3            |
Limit Reached
Query terminated

ksql> SELECT 'ORDERS_CREATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_CREATES GROUP BY 'ORDERS_CREATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_CREATES  |503          |
Limit Reached
Query terminated

ksql> SELECT 'ORDERS_DELETES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_DELETES GROUP BY 'ORDERS_DELETES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_DELETES  |5            |
Limit Reached
Query terminated


推荐阅读