apache-kafka - kafka sink 连接器无法连接到 Couchbase
问题描述
我在本地机器 Ubuntu VM 上运行 Apache Kafka 和 Couchbase。我有一个从 MS SQL 读取的 SourceConnector,以及一个将写入 Couchbase 的接收器连接器。我目前在使用接收器连接器时遇到问题。
当我配置接收器连接器并检查其状态时,它正在运行。但是,在我更新我的 SQL 数据库中的记录并检查连接器状态后,我得到了这个异常:
我已经验证数据正在流入 Kafka 主题和流。唯一不起作用的部分是写入 Couchbase。(Couchbase 没有更新,而且我在连接器状态中遇到异常)。
这是我的连接器 JSON 配置:
{
"connector.class":"com.couchbase.connect.kafka.CouchbaseSinkConnector",
"tasks.max":"1",
"topics":"weconnect-customers-sink",
"connection.cluster_address":"127.0.0.1",
"connection.ssl.enabled":"false",
"connection.bucket":"accounts",
"connection.username":"Administrator",
"connection.password":"Couchbase",
"couchbase.durability.persist_to":"NONE",
"couchbase.durability.replicate_to":"NONE",
"couchbase.remove.document.id": "true",
"couchbase.document.id": "${/id}",
"auto.offset.reset":"latest",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"transforms": "joltFields,replaceFields",
"transforms.joltFields.transformType": "chainr",
"transforms.joltFields.transformSpec": "[{\"operation\":\"shift\",\"spec\":{\"*\":\"&\",\"CHANNELS_*\":\"CHANNELS[]\",\"ADDRESS_CITY\": \"ADDRESS.CITY\",\"ADDRESS_COUNTRYCODE\":\"ADDRESS.COUNTRYCODE\",\"ADDRESS_POSTALCODE\":\"ADDRESS.POSTALCODE\",\"ADDRESS_REGION\":\"ADDRESS.REGION\",\"ADDRESS_STREETHOUSENUMBER\":\"ADDRESS.STREETHOUSENUMBER\", \"COMPANYCODE\":\"COMPANY.CODE\",\"CUSTOMERTYPE\":\"CUSTOMERTYPE.CODE\", \"DOCUMENT_TYPE\": \"DOCUMENTTYPE\"}}]",
"transforms.joltFields.type": "com.pvh.kafka.connect.transforms.JoltFields",
"couchbase.n1ql.clause.fields": "documentType:customer,company.code,customerNumber",
"couchbase.n1ql.clause": "WHERE",
"couchbase.n1ql.operation": "UPSERT",
"couchbase.document.mode": "N1QL",
"couchbase.subdocument.create_document": "false",
"couchbase.create_document": "true",
"transforms.replaceFields.type": "com.pvh.kafka.connect.transforms.ReplaceFields",
"transforms.replaceFields.schema": "{\"type\": \"record\",\"name\": \"CustomersValue\",\"namespace\": \"com.pvh.digitalshowroom\",\"fields\": [{\"name\": \"channels\",\"type\": [ \"null\",{\"type\": \"array\",\"items\": \"string\"}],\"default\": null,\"aliases\": [ \"CHANNELS\" ]},{\"name\": \"id\",\"type\": \"string\",\"aliases\": [ \"id\",\"ID\"]},{\"name\": \"customerNumber\",\"type\": \"string\",\"aliases\": [ \"customerNumber\",\"CUSTOMERNUMBER\"]},{\"name\": \"name\",\"type\": \"string\",\"aliases\": [ \"NAME\"]},{\"name\": \"languageCode\",\"type\": \"string\",\"aliases\": [ \"languagecode\",\"LANGUAGECODE\"]},{\"name\": \"phone\",\"type\": \"string\",\"aliases\": [ \"PHONE\"]},{\"name\": \"remarks\",\"type\": \"string\",\"aliases\": [ \"REMARKS\"]},{\"name\": \"vatCode\",\"type\": \"string\",\"aliases\": [ \"vatcode\",\"VATCODE\"]},{\"name\": \"address\",\"type\":{\"type\": \"record\",\"namespace\": \"com.pvh.digitalshowroom\",\"name\": \"address\",\"fields\": [{\"name\": \"city\",\"type\": \"string\",\"aliases\": [ \"CITY\",\"city\"]},{\"name\": \"postalCode\",\"type\": \"string\",\"aliases\": [ \"POSTALCODE\"]},{\"name\": \"region\",\"type\": \"string\",\"aliases\": [ \"REGION\"]},{\"name\": \"streetHouseNumber\",\"type\": \"string\",\"aliases\": [ \"STREETHOUSENUMBER\"]},{\"name\": \"countryCode\",\"type\": \"string\",\"aliases\": [ \"COUNTRYCODE\"]}]},\"aliases\": [ \"ADDRESS\"]},{\"name\": \"company\",\"type\":{\"type\": \"record\",\"namespace\": \"com.pvh.digitalshowroom\",\"name\": \"company\",\"fields\": [{\"name\": \"code\",\"type\": \"string\",\"aliases\": [ \"CODE\",\"COMPANYCODE\"]}]},\"aliases\": [ \"COMPANY\"]},{\"name\": \"customerType\",\"type\":{\"type\": \"record\",\"namespace\": \"com.pvh.digitalshowroom\",\"name\": \"customerType\",\"fields\": [{\"name\": \"code\",\"type\": \"string\",\"aliases\": [ \"CODE\",\"CUSTOMERTYPE\"]}]},\"aliases\": [ \"CUSTOMERTYPE\"]},{\"name\": \"doumentType\",\"type\": \"string\",\"aliases\": [ \"DOCUMENTTYPE\"]}]}"
}
谁能告诉我我的配置有什么问题?此配置在先前版本的连接器 (3.3.0) 中有效,但在当前版本 (3.4.4) 中无效。我唯一改变的是:
- 将 couchbase.document.id 更改为 ${/id} 。(以前是/id,文档说要改变这个)
- 添加了“connection.ssl.enabled”:“false”
- 为 connection.cluster_address 尝试了以下所有值(没有区别):127.0.0.1、localhost、10.0.2.15(本地 IP)、127.0.0.1:8091、localhost:8091、10.0.2.15:8091
- (已验证我可以在 127.0.0.1:8091 浏览到 Couchbase 管理 UI)
解决方案
表示连接器的类路径中的某处NoSuchMethodError
可能存在不兼容版本的 Coucbasecore-io
或库。java-client
确保安装目录中没有这些库的旧版本(或连接器 jar 的旧版本)。
推荐阅读
- android - Android SQLite onUpgrade add a new table without touching the old ones
- microsoft-graph-api - 如何按 Microsoft Graph 中的扩展值进行筛选?
- php - 计算产品过滤器中每个项目的产品
- wordpress - editing httpd.conf on apache
- indexing - Octave strread can't return parsed results to an array (?)
- python - 将枕头图像编码为 b64 导致无效的 base64 编码字符串
- powershell - How to add users in Active Directory with specific fields via a GUI
- ios - CAEmitterLayer 耗尽电池
- mysql - 如何使用带有“?”的 FIND_IN_SET() SSRS Report Builder 3.0 中的参数?
- .net - 默认格式化程序被自定义格式化程序覆盖