首页 > 解决方案 > kafka sink 连接器无法连接到 Couchbase

问题描述

我在本地机器 Ubuntu VM 上运行 Apache Kafka 和 Couchbase。我有一个从 MS SQL 读取的 SourceConnector,以及一个将写入 Couchbase 的接收器连接器。我目前在使用接收器连接器时遇到问题。

当我配置接收器连接器并检查其状态时,它正在运行。但是,在我更新我的 SQL 数据库中的记录并检查连接器状态后,我得到了这个异常:

在此处输入图像描述

我已经验证数据正在流入 Kafka 主题和流。唯一不起作用的部分是写入 Couchbase。(Couchbase 没有更新,而且我在连接器状态中遇到异常)。

这是我的连接器 JSON 配置:

{
"connector.class":"com.couchbase.connect.kafka.CouchbaseSinkConnector",
"tasks.max":"1",
"topics":"weconnect-customers-sink",
"connection.cluster_address":"127.0.0.1",
"connection.ssl.enabled":"false",
"connection.bucket":"accounts",
"connection.username":"Administrator",
"connection.password":"Couchbase",
"couchbase.durability.persist_to":"NONE",
"couchbase.durability.replicate_to":"NONE",
"couchbase.remove.document.id": "true",
"couchbase.document.id": "${/id}",
"auto.offset.reset":"latest",

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",

"transforms": "joltFields,replaceFields",

"transforms.joltFields.transformType": "chainr",
"transforms.joltFields.transformSpec": "[{\"operation\":\"shift\",\"spec\":{\"*\":\"&\",\"CHANNELS_*\":\"CHANNELS[]\",\"ADDRESS_CITY\": \"ADDRESS.CITY\",\"ADDRESS_COUNTRYCODE\":\"ADDRESS.COUNTRYCODE\",\"ADDRESS_POSTALCODE\":\"ADDRESS.POSTALCODE\",\"ADDRESS_REGION\":\"ADDRESS.REGION\",\"ADDRESS_STREETHOUSENUMBER\":\"ADDRESS.STREETHOUSENUMBER\", \"COMPANYCODE\":\"COMPANY.CODE\",\"CUSTOMERTYPE\":\"CUSTOMERTYPE.CODE\", \"DOCUMENT_TYPE\": \"DOCUMENTTYPE\"}}]",
"transforms.joltFields.type": "com.pvh.kafka.connect.transforms.JoltFields",

"couchbase.n1ql.clause.fields": "documentType:customer,company.code,customerNumber",
"couchbase.n1ql.clause": "WHERE",
"couchbase.n1ql.operation": "UPSERT",
"couchbase.document.mode": "N1QL",
"couchbase.subdocument.create_document": "false",
"couchbase.create_document": "true",

"transforms.replaceFields.type": "com.pvh.kafka.connect.transforms.ReplaceFields",
"transforms.replaceFields.schema": "{\"type\": \"record\",\"name\": \"CustomersValue\",\"namespace\": \"com.pvh.digitalshowroom\",\"fields\": [{\"name\": \"channels\",\"type\": [ \"null\",{\"type\": \"array\",\"items\": \"string\"}],\"default\": null,\"aliases\": [ \"CHANNELS\" ]},{\"name\": \"id\",\"type\": \"string\",\"aliases\": [ \"id\",\"ID\"]},{\"name\": \"customerNumber\",\"type\": \"string\",\"aliases\": [ \"customerNumber\",\"CUSTOMERNUMBER\"]},{\"name\": \"name\",\"type\": \"string\",\"aliases\": [ \"NAME\"]},{\"name\": \"languageCode\",\"type\": \"string\",\"aliases\": [ \"languagecode\",\"LANGUAGECODE\"]},{\"name\": \"phone\",\"type\": \"string\",\"aliases\": [ \"PHONE\"]},{\"name\": \"remarks\",\"type\": \"string\",\"aliases\": [ \"REMARKS\"]},{\"name\": \"vatCode\",\"type\": \"string\",\"aliases\": [ \"vatcode\",\"VATCODE\"]},{\"name\": \"address\",\"type\":{\"type\": \"record\",\"namespace\": \"com.pvh.digitalshowroom\",\"name\": \"address\",\"fields\": [{\"name\": \"city\",\"type\": \"string\",\"aliases\": [ \"CITY\",\"city\"]},{\"name\": \"postalCode\",\"type\": \"string\",\"aliases\": [ \"POSTALCODE\"]},{\"name\": \"region\",\"type\": \"string\",\"aliases\": [ \"REGION\"]},{\"name\": \"streetHouseNumber\",\"type\": \"string\",\"aliases\": [ \"STREETHOUSENUMBER\"]},{\"name\": \"countryCode\",\"type\": \"string\",\"aliases\": [ \"COUNTRYCODE\"]}]},\"aliases\": [ \"ADDRESS\"]},{\"name\": \"company\",\"type\":{\"type\": \"record\",\"namespace\": \"com.pvh.digitalshowroom\",\"name\": \"company\",\"fields\": [{\"name\": \"code\",\"type\": \"string\",\"aliases\": [ \"CODE\",\"COMPANYCODE\"]}]},\"aliases\": [ \"COMPANY\"]},{\"name\": \"customerType\",\"type\":{\"type\": \"record\",\"namespace\": \"com.pvh.digitalshowroom\",\"name\": \"customerType\",\"fields\": [{\"name\": \"code\",\"type\": \"string\",\"aliases\": [ \"CODE\",\"CUSTOMERTYPE\"]}]},\"aliases\": [ \"CUSTOMERTYPE\"]},{\"name\": \"doumentType\",\"type\": \"string\",\"aliases\": [ \"DOCUMENTTYPE\"]}]}"

}

谁能告诉我我的配置有什么问题?此配置在先前版本的连接器 (3.3.0) 中有效,但在当前版本 (3.4.4) 中无效。我唯一改变的是:

标签: apache-kafkacouchbase

解决方案


表示连接器的类路径中的某处NoSuchMethodError可能存在不兼容版本的 Coucbasecore-io或库。java-client确保安装目录中没有这些库的旧版本(或连接器 jar 的旧版本)。


推荐阅读