mysql - 如何配置 Debezium Mysql 连接器以生成原始键而不是 struct 或 json 对象?
问题描述
我正在使用 Debezium 来检测 MySql 源表中的更改。如何生成 Kafka 消息,使得键是数字 ( Long
) 值而不是 Json 对象?
我得到了什么:
key: {"foo_id": 123}
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}
我想要的是:
key: 123
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}
我的 FOO 表如下所示:
foo_id: INT
bar: VARCHAR
baz: VARCHAR
请注意,我没有使用 avro,并且我尝试了以下几种组合(带和不带密钥转换器),但未能获得Long
密钥。
"transforms": "unwrap,insertKey,extractKey",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false",
"transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields":"foo_id",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"foo_id",
"key.converter" : "org.apache.kafka.connect.converters.LongConverter",
"key.converter.schemas.enable": "false",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
我不确定 ValueToKey 或 ExtractField 是否适用于(MySQL)源,但我低于 NPE。
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
解决方案
找到了基于此https://issues.jboss.org/browse/DBZ-689的解决方案
{
...
"config": {
"transforms": "unwrap,insertKey,extractKey",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false",
"transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields":"foo_id",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"foo_id",
"key.converter" : "org.apache.kafka.connect.converters.IntegerConverter",
"key.converter.schemas.enable": "true",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"include.schema.changes": "false" <-- this was missing
}
}
现在,我将 foo_id 视为一个Integer
(没什么大不了的Long
):)
推荐阅读
- google-cloud-platform - Google Cloud Platform 的 Prometheus 服务发现 - 不是有效的主机名错误
- reactjs - Rails API 用户身份验证接受来自 React 代理的 api 调用,但不直接来自主机 url
- powershell - 如何通过powershell将单列(具有多个值)拆分为多值
- javascript - Javascript 无法正确排序包含两个数组值的数组
- python - Pybars 不编译 jinja if 条件和 for 循环
- python - 是否可以在作为 tor 隐藏服务运行的服务器和客户端之间建立 python 套接字连接?
- sharepoint - Power Apps/Sharepoint,修改后Item不会改变
- android - 在所有新项目错误 ContextWrapper not found 但是当我更改目标并将 compileSdkVersion 更改为 30 时它工作正常,那么为什么会发生 sdk31
- python - 将浮点数数组转换为元组数组,其中浮点数作为元组条目之一
- flutter - 更改 API 数据