apache-kafka - 带有 Json Schema 注册表的 Kafka JDBC 接收器连接器
问题描述
我有一个用 postgreSQL 编写的接收器连接器,配置如下。
{
"name": "connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://localhost:5432/postgresuser=user&password=123",
"topics": "kafka-to-jsonb31",
"insert.mode": "insert",
"dialect.name":"PostgreSqlDatabaseDialect",
"table.name.format":"sample_jsonb",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8090/json-schema.txt",
"auto.create":"false",
"auto.evolve":"false",
"pk.mode":"none",
"pk.fields":"none",
"transforms": "timestamp",
"transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp.target.type": "Timestamp",
"transforms.timestamp.field":"timestamp",
"transforms.timestamp.format": "yyyy-MM-dd HH:mm:ss"
}
}
如果我在 Kafka 消息中提供模式,这非常有效,但我希望有一个包含模式定义的单独文件,并且只有实际的有效负载应该发送到连接器。为此我尝试了
"value.converter.schema.registry.url": "http://localhost:8090/json-schema.txt",
但看起来连接器无法获取架构并抛出异常
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
以下是我希望连接器接收的消息
{
"data": "{\"name\":\"name11\",\"age\":26}",
"timestamp":"2021-08-09 17:39:00"
}
以下是架构文件的内容
{
"schema": {
"type": "struct",
"fields": [{
"field": "data",
"type": "string",
"optional": false
}, {
"field": "timestamp",
"type": "string",
"optional": false
}]
}
}
解决方案
value.converter.schema.registry.url
应仅包含架构注册表的 URL:
"value.converter.schema.registry.url": "http://localhost:8090/"
只要您的架构注册表已经拥有该架构,就可以通过Subject Name Strategy找到该主题。
此外,您必须使用此转换器 :value.converter=io.confluent.connect.json.JsonSchemaConverter
而不是org.apache.kafka.connect.json.JsonConverter
. 这一个允许您将模式注册表与 json 消息一起使用。
JsonConverter
需要您在消息中提供架构(您当前面临的问题)。请参阅本文以了解更多详细信息。
推荐阅读
- reactjs - 如何同步尝试异步 setState?
- r - r:在传单中绘制几何集合
- php - 如何使用 PHP geth 通过合约地址获取 ERC20 代币名称和符号
- javascript - 如何对具有多组json对象的json数组进行排序
- javascript - 世博打字稿应用程序如何正确键入导航道具以避免。:“t 'navigation' 隐含了 'any' 类型”
- excel - 选择图表 VBA
- c# - 我在反序列化 XML 时收到错误消息“System.ArgumentException: 'Illegal characters in path.'”
- cross-compiling - 配置:错误:找不到有效的 LDAP 库(pam_ldap 库)
- javascript - 将字节数组或字符串值转换为 P5.js 中的浮点数(基于 JavaScript)
- python - 将字节串解析为 Python 字典