首页 > 解决方案 > 如何在没有连接器的情况下创建正确的 kafka-connect 插件?

问题描述

我尝试为我的数据创建带有“转换”的插件到 kafka-connect 并将其与不同的接收器连接器一起使用。当我安装插件时,kafka-connect 看不到我的课程。

我使用 kafka-connect maven 插件来创建我的包 zip。使用 confluent-hub(从本地文件)安装已成功。

所有文件都已解压缩,我的工作人员属性已更新 plugin.paths。我以分布式模式运行我的连接,并尝试使用我的包中的变压器创建新的连接器。

我的插件结构如下:

- mwojdowski-my-connect-plugin-0.0.1-SNAPSHOT
|- manifest.json
|- lib
||- my-connect-plugin-0.0.1-SNAPSHOT.jar

和我的 manifest.json 文件:

{
  "name" : "my-connect-plugin",
  "version" : "0.0.1-SNAPSHOT",
  "title" : "my-connect-plugin",
  "description" : "A set of transformations for Kafka Connect",
  "owner" : {
    "username" : "mwojdowski",
    "name" : "Marcin Wojdowski<mwojdowski@gmail.com>"
  },
  "tags" : [ "transform", "field", "topic" ],
  "features" : {
    "supported_encodings" : [ "any" ],
    "single_message_transforms" : true,
    "confluent_control_center_integration" : true,
    "kafka_connect_api" : true
  },
  "documentation_url" : "",
  "docker_image" : { },
  "license" : [ {
    "name" : "Confluent Software License",
    "url" : "https://www.confluent.io/software-evaluation-license"
  } ],
  "component_types" : [ "transform" ],
  "release_date" : "2019-08-29"
}

接下来,我尝试创建新的连接器:

curl -XPOST -H 'Content-type:application/json' 'localhost:8083/connectors' -d '{
    "name" : "custom-file-sink-with-validation",
    "config" : {
    "connector.class" : "FileStreamSink",
        "tasks.max" : "1",
        "topics" : "test_topic",
        "file" : "/tmp/my-plugin-test.txt",
        "key.ignore" : "true",
        "schema.ignore" : "true",
        "drop.invalid.message": "false",
        "behavior.on.malformed.documents": "warn",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "transforms" : "Validation",
        "transforms.Validation.type" : "org.kafka.connect.my.connector.ValidateId"
    }
}'

重新启动 kafka 连接后,当我尝试创建新连接器时,抛出异常:

{
    "error_code": 400,
    "message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value org.kafka.connect.my.connector.ValidateId for configuration transforms.Validation.type: Class org.kafka.connect.my.connector.ValidateId could not be found.\nInvalid value null for configuration transforms.Validation.type: Not a Transformation\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

我也尝试手动安装插件,遵循文档: https ://docs.confluent.io/current/connect/managing/install.html

但看起来,Connect 没有加载我的罐子。

当我将我的 jar 复制到 share/java/kafka 时,它可以工作,但这不是解决方案。

我怀疑我的插件被跳过了,因为它不包含连接器。在这种情况下,我应该手动将我的 jar 添加到类路径吗?(与https://docs.confluent.io/current/connect/userguide.html#installing-plugins相反)

或者我应该明确指出我的连接器配置以尝试使用我的插件?

问候,M。

标签: javamavenapache-kafkaapache-kafka-connect

解决方案


对不起,问题真的很微不足道。在重构过程中,其中一个包最后得到“'s”,我错过了在 config.xml 中更新它。

"transforms.Validation.type" : "org.kafka.connect.my.connectors.ValidateId"

代替

"transforms.Validation.type" : "org.kafka.connect.my.connector.ValidateId"

我在从独立切换到分布式之前对其进行了重构。再次抱歉让您担心,感谢您的支持。

问候, 马辛


推荐阅读