首页 > 解决方案 > JDBC Sink 连接器:如何将 Kafka 消息中的字段映射到数据库表的列

问题描述

我正在使用Confluent JDBC Sink Connector来捕获从 Kafka 主题到数据库的所有更改。我的消息是没有任何附加架构的 JSON 格式。例如:

{ "key1": "value1", "key2": 100}

这是我的配置:

name=sink-mysql-1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=send_1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
database.hostname=jdbc:mysql://0.0.0.0:3306/test_tbl
database.user=root
database.password=root
insert.mode=upsert
pk.mode=kafka
auto.create=true
auto.evolve=true

我遇到的问题是:由于遗留系统,我无法更改消息格式。所以我的消息是没有模式信息的 JSON 对象。库是否支持映射字段?例如从数据库下的字段 A 到字段 B 的映射。

谢谢

标签: mysqljdbcapache-kafkaapache-kafka-connect

解决方案


必须为您的数据声明一个架构才能使用 JDBC Sink。这意味着在实践中您需要:

如果您在将数据生成到 Kafka 时没有该选项,则可以构建应用模式的流处理阶段。您可以使用 Kafka Streams 或 KSQL 之类的东西来做到这一点。其输出是一个 Kafka 主题,然后您将其用作 Kafka Connect 的源。在 KSQL 中执行此操作的一个示例是:

-- Declare the schema of the source JSON topic
CREATE STREAM send_1_src (KEY1 VARCHAR, 
                          KEY2 INT) 
  WITH (KAFKA_TOPIC='send_1', 
        VALUE_FORMAT='JSON');

-- Run a continuous query populating the target topic `SEND_1_AVRO` 
-- with the data from `send_1` reserialised into Avro
CREATE STREAM SEND_1_AVRO 
  WITH (VALUE_FORMAT='AVRO') AS 
  SELECT * 
    FROM send_1_src;


推荐阅读