apache-kafka - Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题
问题描述
我正在使用 Debezium 和 kafka connect。我需要从下面的有效负载中提取“listid”,并能够将其分配给 kafka 消息键。使用我的连接器文件中的配置,我无法提取该值。感谢解决此问题的任何帮助。
{
"before": null,
"after": {
"listid": 19,
"billingid": "0",
"userid": "test",
连接器属性
key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=unwrap,extract
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type==org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid
key.converter.schemas.enable=false
value.converter.schemas.enable=true
Sooooo 我正在继续进行实验,并希望分享对问题的更好解释以及我所做的实验。
跟随 Robin 的博客https://www.confluent.fr/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
1- 从终端运行 kafka-avro 消费者
钥匙
{"LIST_ID":10058}
价值
{"before":null,"after":{"test.dbo.test.Value":{"LIST_ID":10058,"billingid
etc...
2-我试图让密钥成为值 10058
3 - 这就是我的转换配置
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=createKey,extractInt,
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=LIST_ID
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=LIST_ID
我在针对 mssql 服务器的管道中使用 debezium 连接器。
解决方案
Looks like a typo (extract
vs extractkey
):
transforms=unwrap,extractkey
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid
推荐阅读
- python - 如何对齐不同的图 xticks/figure 大小(一个来自 matplotlib 的图,另一个来自 seaborn)
- asp.net-core - .cshtml 中的 Request.Form 出现问题
- r - 使用 R 程序二项式分布概率质量函数
- apache-spark - 无法通过在 jar 中打包 spark-avro_2.11 来加载 avro
- node.js - 如何使用工作目录到 .BAT 文件创建 SnoreToast 开始菜单快捷方式?
- c++ - 使用 C++ 剪切和重组文件中的数字
- python - 将 sklearn 随机森林模型转移到新服务器
- python - 嵌套循环中的嵌套列表
- python - 你能给 TypeVar 一个类型参数吗
- firebase - 检查在 Firestore 中阅读了多少次文档?