首页 > 解决方案 > 无法在 Oracle 数据库中使用 SMT 转换提取字段

问题描述

我无法执行 SMT 转换“ExtractField”,以便使用 Oracle 数据库将字段从键结构中提取为简单的长值。它适用于 Postgres 数据库。

我尝试使用“ReplaceField”SMT 重命名密钥,它工作正常。我怀疑“org.apache.kafka.connect.transforms.ExtractField”类中存在有关模式处理以获取该字段的问题。“ReplaceField”和“ExtractField”之间的模式处理似乎不同。

Oracle 数据库版本:Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - 生产版本 19.8.0.0.0 Debezium 连接:1.6 Kafka 版本:2.7.0 Instanclient basic(Oracle 客户端和驱动程序):21.3.0.0.0

我得到了一个“未知字段 ID_MYTABLE”:

org.apache.kafka.connect.errors.ConnectException:在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) 在 org.apache.kafka.connect.runtime 的错误处理程序中超出了容差.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask .java:339) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) 在 org. apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 在 java.base/java.util。concurrent.FutureTask.run(FutureTask.java:264) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:628) 在 java.base/java.lang.Thread.run(Thread.java:834) 引起:java.lang.IllegalArgumentException:未知字段: org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) 中的 org.apache.kafka.connect.transforms.ExtractField.apply( ExtractField.java:65 ) .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 还有 11 个

这是我的 Kafka 连接器的配置:

{
  "name": "oracle-connector",  
  "config": {   
    "connector.class": "io.debezium.connector.oracle.OracleConnector", 
    "tasks.max": "1", 
    "database.server.name": "serverName", 
    "database.user": "c##dbzuser", 
    "database.password": "dbz", 
    "database.url": "jdbc:oracle:thin:...", 
    "database.dbname": "dbName", 
    "database.pdb.name": "PDBName", 
    "database.connection.adapter": "logminer", 
    "database.history.kafka.bootstrap.servers": "kafka:9092", 
    "database.history.kafka.topic": "schema-changes.data", 
    "schema.include.list": "mySchema", 
    "table.include.list": "mySchema.myTable", 
    "log.mining.strategy": "online_catalog", 
    "snapshot.mode": "initial", 
    "key.converter": "org.apache.kafka.connect.json.JsonConverter", 
    "key.converter.schemas.enable": "false", 
    "value.converter": "io.confluent.connect.avro.AvroConverter", 
    "value.converter.schemas.enable": "true", 
    "value.converter.schema.registry.url": "http://schema-registry:8081", 
    "transforms": "unwrap,route,extractField",
    "transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key", 
    "transforms.extractField.field": "ID_MYTABLE", 
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", 
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", 
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", 
    "transforms.route.replacement": "$1_$2_$3" 
  } 
}

标签: oracleapache-kafkaapache-kafka-connectdebezium

解决方案


推荐阅读