oracle - ORA-01747: Nifi PutDatabaseRecord 上的 user.table.column 无效
问题描述
我尝试使用 Nifi 更新 Oracle 数据库中的一些数据库列。
我有这样的电路部分:
我对最后一个有问题PutDatabaserecord
:
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | 2021-07-12 17:34:20,919 ERROR [Timer-Driven Process Thread-2] o.a.n.p.standard.PutDatabaseRecord PutDatabaseRecord[id=017a10b4-fe2c-1b89-f752-67545ebb8406] Failed to put Records to database for StandardFlowFileRecord[uuid=db4a0554-6ea8-4cca-b1da-11bdacef1ccc,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1625839380170-6543, container=default, section=399], offset=292967, length=68],offset=0,name=f4d1875f-bec4-4944-ad8e-7955048148f3,size=68]. Routing to failure.: java.sql.BatchUpdateException: ORA-01747: invalid user.table.column, table.column, or column specification
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain |
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | java.sql.BatchUpdateException: ORA-01747: invalid user.table.column, table.column, or column specification
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain |
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:10032)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1364)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9839)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:234)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.lang.reflect.Method.invoke(Method.java:498)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at com.sun.proxy.$Proxy149.executeBatch(Unknown Source)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.executeDML(PutDatabaseRecord.java:754)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.putToDatabase(PutDatabaseRecord.java:841)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.onTrigger(PutDatabaseRecord.java:487)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.lang.Thread.run(Thread.java:748)
这是有问题的节点的配置:
这是 RecordReader 的架构:
{
"name": "load_date",
"type": "record",
"namespace": "maxi",
"fields": [
{
"name": "doc_id",
"type": "int"
},
{
"name": "line_id",
"type": "int"
},
{
"name": "load_date",
"type": "string"
}
]
}
这是到达节点的 json 数据示例:
[{"doc_id":1795576199,"line_id":689617855,"load_date":"2021-34-12"}]
更新
好的,我将 PutDatabaseRecord 设置为调试模式,评估单一记录?从处理器捕获完整的调试信息。这是处理器开始处理记录时的日志头:
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:15:23,951 INFO [NiFi Web Server-456] o.a.n.c.s.StandardProcessScheduler Starting SplitJson[id=9f810155-017a-1000-890c-4b1e382a161e]
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:15:23,951 INFO [NiFi Web Server-456] o.a.n.controller.StandardProcessorNode Starting SplitJson[id=9f810155-017a-1000-890c-4b1e382a161e]
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:15:23,964 INFO [Timer-Driven Process Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled SplitJson[id=9f810155-017a-1000-890c-4b1e382a161e] to run with 1 threads
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:15:23,976 ERROR [Timer-Driven Process Thread-1] o.a.n.p.standard.PutDatabaseRecord PutDatabaseRecord[id=017a10b4-fe2c-1b89-f752-67545ebb8406] Failed to put Records to database for StandardFlowFileRecord[uuid=9c53fd8b-775b-42a6-bcae-fb4208a40985,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1626174735665-61, container=default, section=61], offset=776075, length=330],offset=0,name=51510f97-7a54-4394-966a-12775653acb0,size=66]. Routing to failure.: java.sql.SQLDataException: Cannot map field 'doc_id' to any column in the database
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | Columns:
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | java.sql.SQLDataException: Cannot map field 'doc_id' to any column in the database
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | Columns:
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.generateUpdate(PutDatabaseRecord.java:1073)
(我在那里添加了另一个处理器 - SplitJson
)
而且,我在我自己的数据库方案中添加了一些表。
这是 DDL:
CREATE TABLE psheom.ml_task (
doc_id number,
line_id number,
load_date DATE,
CONSTRAINT pk_ml_task PRIMARY KEY(doc_id, line_id)
)
这是以下结果DESCRIBE psheom.ml_task
:
Name Null? Type
--------- -------- ------
DOC_ID NOT NULL NUMBER
LINE_ID NOT NULL NUMBER
LOAD_DATE DATE
更新
我尝试按照@pmdba 的建议制作 ExecuteSql 处理器,但出现了一些错误。这是配置:
这是错误:
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:57:19,137 ERROR [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.ExecuteSQL ExecuteSQL[id=9fb97e49-017a-1000-bae9-256f569c239b] Unable to execute SQL select query describe psheom.ml_task due to java.sql.SQLSyntaxErrorException: ORA-00900: invalid SQL statement
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | . No FlowFile to route to failure: java.sql.SQLSyntaxErrorException: ORA-00900: invalid SQL statement
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain |
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | java.sql.SQLSyntaxErrorException: ORA-00900: invalid SQL statement
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain |
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:494)
更新
我设置Translate Field Names
为false
,但结果相同。
解决方案
好的,所以我们需要UPDATE
为Oracle
.
首先不要使用 DBeaver!这是非常错误的!仅使用 SQLDeveloper 来处理 Oracle。
其次,在 Nifi 端为连接池配置相同的用户创建会话,并DESCRIBE
为需要更新的表运行:
DESCRIBE GOODS.ML_TASK
确定,成功了!
第三,我们需要大写所有实体名称,并且最好设置Translate Field Names
为false
,因此它会删除下划线,即:doc_id -> docid
。
第五,您需要json
在处理器的输入中使用大写的字段名称:
[
{"DOC_ID":1799041400,"LINE_ID":694098344,"LOAD_DATE":"14-Jul-21"},
{"DOC_ID":1802019315,"LINE_ID":697885808,"LOAD_DATE":"14-Jul-21"}
]
如果您在某个地方有小写字段,请使用UpdateAttribute
带有读取器和写入器的处理器,其中读取器将大写字段:
为 PutDatabaseRecord 读取器定义的模式必须大写:
{
"name": "load_date",
"type": "record",
"namespace": "maxi",
"fields": [
{
"name": "DOC_ID",
"type": "int"
},
{
"name": "LINE_ID",
"type": "int"
},
{
"name": "LOAD_DATE",
"type": "string"
}
]
}
所以在这里你有:
UpdateRecord
:
- IncomingJsonReader 接受小写的记录,并转为大写
- 传入JsonWriter
PutDatanaseRecord
- 传出JsonReader
他们都使用相同的 JSON SCHEMA
第五,如您所见,使用string
类型作为日期。
您可以将日期设置为正弦日期的毫秒数,但是,尽管它在我自己的模式中有效,但在生产模式中却不起作用。所以需要查询NLS_DATE_FORMAT
。
将适当的查询放入ExecuteSQL
处理器中:
select * from nls_session_parameters where parameter = 'NLS_DATE_FORMAT'
您将在结果队列中获得格式:
[{"PARAMETER": "NLS_DATE_FORMAT", "VALUE": "DD-MON-RR"}]
使用它以适当的方式格式化日期。
第六,如我们所见,将正确的日期格式放在“表达式语言”表达式中的正确位置,例如,UpdateRecord
在我的例子中:
第七,确保只有一条或几条记录通过了队列,以便能够监控日志。还要输入PutDatabaseRecord
模式DEBUG
:
通过这种方式,它会通知您是否成功从数据库中获取模式。
所以...祝你甲骨文的表更新愉快!
推荐阅读
- javascript - 为什么在外部范围内等待内联变量与简单的等待分配不同?
- c - 将结构保存到文件中,然后在同一个程序中读取它
- reporting-services - 数量和外部价格乘以三?
- javascript - 如何在 GTM 中使用自定义变量
- dart - 如何在服务器的 dart 中实现 grpc 拦截器?
- r - 我们如何在 R 的多线图中混合线型?
- python-3.x - 如何使用多个工作人员和负载均衡器设置 Flask + Flask-SocketIO + Gunicorn + Nginx
- xamarin - Xamarin Forms 有哪些好的分析工具?
- php - 如何在 OVH cloud-web-1 主机上运行 PHP websocket,
- r - 比较 t 检验和 mean() 函数时组均值的不同输出