mysql - JDBC Sink 连接器:如何将 Kafka 消息中的字段映射到数据库表的列
问题描述
我正在使用Confluent JDBC Sink Connector来捕获从 Kafka 主题到数据库的所有更改。我的消息是没有任何附加架构的 JSON 格式。例如:
{ "key1": "value1", "key2": 100}
这是我的配置:
name=sink-mysql-1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=send_1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
database.hostname=jdbc:mysql://0.0.0.0:3306/test_tbl
database.user=root
database.password=root
insert.mode=upsert
pk.mode=kafka
auto.create=true
auto.evolve=true
我遇到的问题是:由于遗留系统,我无法更改消息格式。所以我的消息是没有模式信息的 JSON 对象。库是否支持映射字段?例如从数据库下的字段 A 到字段 B 的映射。
谢谢
解决方案
您必须为您的数据声明一个架构才能使用 JDBC Sink。这意味着在实践中您需要:
- 在 Avro 中生成数据
- 使用预期的模式/有效负载结构以 JSON 格式生成数据
如果您在将数据生成到 Kafka 时没有该选项,则可以构建应用模式的流处理阶段。您可以使用 Kafka Streams 或 KSQL 之类的东西来做到这一点。其输出是一个 Kafka 主题,然后您将其用作 Kafka Connect 的源。在 KSQL 中执行此操作的一个示例是:
-- Declare the schema of the source JSON topic
CREATE STREAM send_1_src (KEY1 VARCHAR,
KEY2 INT)
WITH (KAFKA_TOPIC='send_1',
VALUE_FORMAT='JSON');
-- Run a continuous query populating the target topic `SEND_1_AVRO`
-- with the data from `send_1` reserialised into Avro
CREATE STREAM SEND_1_AVRO
WITH (VALUE_FORMAT='AVRO') AS
SELECT *
FROM send_1_src;
- 要了解有关 KSQL 的更多信息,请参见此处。
- 您可以在此处的 Kafka 教程中找到一些使用原始 Kafka 消费者、Kafka Streams 与 KSQL 的流处理模式的优秀示例。
推荐阅读
- clojure - Clojure用接口参数调用java方法
- c - X11:无法在使用 XCreateWindow 创建的透明窗口上绘制图像
- c# - 如何在 DotNet 框架 1.1 中获取程序集名称?
- sql - IBM Maximo Where 子句
- wpf - 是否有与 CSS 的网格区域等效的 XAML?
- apache-kafka - 带有 statestore 的 Kafka 有状态流处理器:幕后
- ldap - 无法从 389 目录服务器检索 eduPerson 属性
- java - 尝试访问类中定义的 ArrayList 时出现 NoSuchFieldError
- rust - 有没有比在每个枚举值上使用重命名更优雅的方法来让 serde 生成所有值 PascalCase?
- python - 将另一个函数中的 matplotlib 图保存为图像但不显示该图