oracle - 无法在 Oracle 数据库中使用 SMT 转换提取字段
问题描述
我无法执行 SMT 转换“ExtractField”,以便使用 Oracle 数据库将字段从键结构中提取为简单的长值。它适用于 Postgres 数据库。
我尝试使用“ReplaceField”SMT 重命名密钥,它工作正常。我怀疑“org.apache.kafka.connect.transforms.ExtractField”类中存在有关模式处理以获取该字段的问题。“ReplaceField”和“ExtractField”之间的模式处理似乎不同。
Oracle 数据库版本:Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - 生产版本 19.8.0.0.0 Debezium 连接:1.6 Kafka 版本:2.7.0 Instanclient basic(Oracle 客户端和驱动程序):21.3.0.0.0
我得到了一个“未知字段 ID_MYTABLE”:
org.apache.kafka.connect.errors.ConnectException:在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) 在 org.apache.kafka.connect.runtime 的错误处理程序中超出了容差.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask .java:339) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) 在 org. apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 在 java.base/java.util。concurrent.FutureTask.run(FutureTask.java:264) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:628) 在 java.base/java.lang.Thread.run(Thread.java:834) 引起:java.lang.IllegalArgumentException:未知字段: org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) 中的 org.apache.kafka.connect.transforms.ExtractField.apply( ExtractField.java:65 ) .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 还有 11 个
这是我的 Kafka 连接器的配置:
{
"name": "oracle-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.server.name": "serverName",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.url": "jdbc:oracle:thin:...",
"database.dbname": "dbName",
"database.pdb.name": "PDBName",
"database.connection.adapter": "logminer",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.data",
"schema.include.list": "mySchema",
"table.include.list": "mySchema.myTable",
"log.mining.strategy": "online_catalog",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap,route,extractField",
"transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractField.field": "ID_MYTABLE",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$1_$2_$3"
}
}
解决方案
推荐阅读
- javascript - 在javascript中,“for .. in”语句如何产生副作用?
- amazon-web-services - Terraform 无法使用静态 S3 网站端点创建 CloudFront 的源
- android - 添加另一个活动时应用程序崩溃
- jsp - 如何使用 servlet 处理异常,以便如果用户在这种情况下将字段留空,它将采用默认值
- symfony - 用于相同 server_name 下的项目的 Nginx 配置
- c# - 管理自动滚动
- amazon-web-services - 使用 lambda 函数的 AWS 弹性搜索更改策略
- java - Client Server tcp reset 导致丢包
- javascript - 为什么我们还要在 html 中使用表单元素?
- python - 带有 Apache 光束 TypeError 的云发布/订阅