java - 如何在没有连接器的情况下创建正确的 kafka-connect 插件?
问题描述
我尝试为我的数据创建带有“转换”的插件到 kafka-connect 并将其与不同的接收器连接器一起使用。当我安装插件时,kafka-connect 看不到我的课程。
我使用 kafka-connect maven 插件来创建我的包 zip。使用 confluent-hub(从本地文件)安装已成功。
所有文件都已解压缩,我的工作人员属性已更新 plugin.paths。我以分布式模式运行我的连接,并尝试使用我的包中的变压器创建新的连接器。
我的插件结构如下:
- mwojdowski-my-connect-plugin-0.0.1-SNAPSHOT
|- manifest.json
|- lib
||- my-connect-plugin-0.0.1-SNAPSHOT.jar
和我的 manifest.json 文件:
{
"name" : "my-connect-plugin",
"version" : "0.0.1-SNAPSHOT",
"title" : "my-connect-plugin",
"description" : "A set of transformations for Kafka Connect",
"owner" : {
"username" : "mwojdowski",
"name" : "Marcin Wojdowski<mwojdowski@gmail.com>"
},
"tags" : [ "transform", "field", "topic" ],
"features" : {
"supported_encodings" : [ "any" ],
"single_message_transforms" : true,
"confluent_control_center_integration" : true,
"kafka_connect_api" : true
},
"documentation_url" : "",
"docker_image" : { },
"license" : [ {
"name" : "Confluent Software License",
"url" : "https://www.confluent.io/software-evaluation-license"
} ],
"component_types" : [ "transform" ],
"release_date" : "2019-08-29"
}
接下来,我尝试创建新的连接器:
curl -XPOST -H 'Content-type:application/json' 'localhost:8083/connectors' -d '{
"name" : "custom-file-sink-with-validation",
"config" : {
"connector.class" : "FileStreamSink",
"tasks.max" : "1",
"topics" : "test_topic",
"file" : "/tmp/my-plugin-test.txt",
"key.ignore" : "true",
"schema.ignore" : "true",
"drop.invalid.message": "false",
"behavior.on.malformed.documents": "warn",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"transforms" : "Validation",
"transforms.Validation.type" : "org.kafka.connect.my.connector.ValidateId"
}
}'
重新启动 kafka 连接后,当我尝试创建新连接器时,抛出异常:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value org.kafka.connect.my.connector.ValidateId for configuration transforms.Validation.type: Class org.kafka.connect.my.connector.ValidateId could not be found.\nInvalid value null for configuration transforms.Validation.type: Not a Transformation\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
我也尝试手动安装插件,遵循文档: https ://docs.confluent.io/current/connect/managing/install.html
但看起来,Connect 没有加载我的罐子。
当我将我的 jar 复制到 share/java/kafka 时,它可以工作,但这不是解决方案。
我怀疑我的插件被跳过了,因为它不包含连接器。在这种情况下,我应该手动将我的 jar 添加到类路径吗?(与https://docs.confluent.io/current/connect/userguide.html#installing-plugins相反)
或者我应该明确指出我的连接器配置以尝试使用我的插件?
问候,M。
解决方案
对不起,问题真的很微不足道。在重构过程中,其中一个包最后得到“'s”,我错过了在 config.xml 中更新它。
"transforms.Validation.type" : "org.kafka.connect.my.connectors.ValidateId"
代替
"transforms.Validation.type" : "org.kafka.connect.my.connector.ValidateId"
我在从独立切换到分布式之前对其进行了重构。再次抱歉让您担心,感谢您的支持。
问候, 马辛