java - NIFI ExecuteScript 1.10.0 处理器:java.io.IOException:标记无效
问题描述
我将 ExecuteScript 处理器与 python 一起使用。它曾经可以工作,但今天我写了两千多行脚本,问题发生了。日志如下,我认为是BufferedReader方法标记/重置导致了这个问题!
2020-05-13 17:21:49,433 ERROR [Timer-Driven Process Thread-87] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=b015d494-32cb-35f7-92b2-268a30ff730b] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: java.io.IOException: java.io.IOException: Mark invalid: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: java.io.IOException: java.io.IOException: Mark invalid
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: java.io.IOException: java.io.IOException: Mark invalid
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:254)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.script.ScriptException: java.io.IOException: java.io.IOException: Mark invalid
at org.python.jsr223.PyScriptEngine.scriptException(PyScriptEngine.java:226)
at org.python.jsr223.PyScriptEngine.compileScript(PyScriptEngine.java:93)
at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:31)
at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
at org.apache.nifi.script.impl.JythonScriptEngineConfigurator.eval(JythonScriptEngineConfigurator.java:62)
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:238)
... 11 common frames omitted
Caused by: org.python.core.PyException: null
at org.python.core.Py.JavaError(Py.java:552)
at org.python.core.ParserFacade.fixParseError(ParserFacade.java:107)
at org.python.core.ParserFacade.parseExpressionOrModule(ParserFacade.java:136)
at org.python.util.PythonInterpreter.compile(PythonInterpreter.java:320)
at org.python.util.PythonInterpreter.compile(PythonInterpreter.java:316)
at org.python.util.PythonInterpreter.compile(PythonInterpreter.java:308)
at org.python.jsr223.PyScriptEngine.compileScript(PyScriptEngine.java:87)
... 15 common frames omitted
Caused by: java.io.IOException: Mark invalid
at java.io.BufferedReader.reset(BufferedReader.java:512)
at org.python.core.ParserFacade.parseExpressionOrModule(ParserFacade.java:133)
... 19 common frames omitted
脚本示例代码如下,我模拟我的脚本,'for循环'会一直在进行,原始脚本将到达第2473行
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self, flowfile):
self.ff = flowfile
pass
def process(self, inputStream, outputStream):
prop_col_names = ['NOROUTE_FLAG','DIP_TOTALITEM','INPUTDATE','MODEL_ALIAS','LINE_RATE','PWB_NO','WMS_SEND_FLAG','UNIT_PRICE','DIP_CHECK_DATE','LINK_FLAG','PCB_QTY','MODEL_TYPE','MP_FLAG','YHP_APP','ONENESS_QTY','SMT_CAR_QTY','SMT_TOTALITEM','KP_NO','MODEL_VER','PALLET_SNTYPEID','VALID_FLAG','LINE_ROUTE','CHECK_FLAG','REDUCETONORMAL','WUNIT','CHANGE_DATE','PRODUCT_NAME','AUTO_SN','SSN_MODEL','SSN_DEFINE','SMT_TOTALSOLDER','CREATE_DATE','PACK_DATE','PICK_DATE','PCB_QTY_FLAG','GWEIGHT','SB_MODEL','MO_TYPE','QTY_CONTROL_FLAG','SSN_TYPE','PCBA_VER','NWEIGHT','CLASS_NAME','DEFAULT_LINE','MODEL_RANGE2','MODEL_RANGE1','ROUTE_CODE','SAP_MODEL_FLAG','EXTRA_QTY','CATEGORY','POINT_A','POINT_B','PROD_TYPE','START_SN','BOM_NO','MODEL_PARENT','STANDARD','KEYIN_EMPCODE','DIP_TOTALSOLDER','MODEL_ATTRIBUTE','PCB_CODE','PCB_VER','PCB_TYPE_ID','TYPE_NUMBER','BIOS_VER','SERIES_NAME','MFG_PART','CHANGE_EMP','END_SN','NORMALTOADD','PB_FLAG','SP_MODEL_FLAG','SSN_LENGTH','CREATE_EMP','LABEL_CONTROL_FLAG']
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
system_id = self.ff.getAttribute('system_id')
source_schema = self.ff.getAttribute('source_schema')
source_table = self.ff.getAttribute('source_table')
work_master_cap_prop = []
work_master_cap = []
m_work_master_cap_s_work_master_cap_prop_by_work_master_cap_prop = []
for row in obj:
main_id = row['MODEL_NAME']
source_schema = row.get('owner', source_schema)
source_table = row.get('table', source_table)
op = row.get('op', 'i')
scn = row.get('scn', -1)
train_id = row.get('train_id', '')
train_seq = row.get('train_seq', -1)
rowid = row.get('rowid', '')
load_seq = row.get('load_seq', -1)
row_status = 'I'
for col_name in prop_col_names:
d = {}
m = {}
if col_name == 'NOROUTE_FLAG':
d = {
'ACTION_FLAG': op,
'TXN_ID': train_id,
'TXN_SEQ': train_seq,
'ACTION_SEQ': load_seq,
'GROUP_KEY': main_id,
'Value': row['NOROUTE_FLAG'],
'ID': 'NOROUTE_FLAG',
'ROW_STATUS': row_status,
'GROUP_TABLE': source_schema + '.' + source_table,
'R_ID': rowid,
'SYSTEM_ID': system_id,
'SCN': scn
}
m = {
'ACTION_FLAG': op,
'TXN_ID': train_id,
'FID_WORK_MASTER_CAP_PROP_SYSTEM_ID': system_id,
'PID_WORK_MASTER_CAP_SYSTEM_ID': system_id,
'TXN_SEQ': train_seq,
'ACTION_SEQ': load_seq,
'PID_WORK_MASTER_CAP_ID': row['MODEL_NAME'],
'FID_WORK_MASTER_CAP_PROP_ID': 'NOROUTE_FLAG',
'ROW_STATUS': row_status,
'R_ID': rowid,
'PID_WORK_MASTER_CAP_GROUP_KEY': main_id,
'FID_WORK_MASTER_CAP_PROP_GROUP_TABLE': source_schema + '.' + source_table,
'FID_WORK_MASTER_CAP_PROP_GROUP_KEY': main_id,
'PID_WORK_MASTER_CAP_GROUP_TABLE': source_schema + '.' + source_table,
'SCN': scn
}
elif col_name == 'DIP_TOTALITEM':
d = {
'ACTION_FLAG': op,
'TXN_ID': train_id,
'TXN_SEQ': train_seq,
'ACTION_SEQ': load_seq,
'GROUP_KEY': main_id,
'Value': row['DIP_TOTALITEM'],
'ID': 'DIP_TOTALITEM',
'ROW_STATUS': row_status,
'GROUP_TABLE': source_schema + '.' + source_table,
'R_ID': rowid,
'SYSTEM_ID': system_id,
'SCN': scn
}
m = {
'ACTION_FLAG': op,
'TXN_ID': train_id,
'FID_WORK_MASTER_CAP_PROP_SYSTEM_ID': system_id,
'PID_WORK_MASTER_CAP_SYSTEM_ID': system_id,
'TXN_SEQ': train_seq,
'ACTION_SEQ': load_seq,
'PID_WORK_MASTER_CAP_ID': row['MODEL_NAME'],
'FID_WORK_MASTER_CAP_PROP_ID': 'DIP_TOTALITEM',
'ROW_STATUS': row_status,
'R_ID': rowid,
'PID_WORK_MASTER_CAP_GROUP_KEY': main_id,
'FID_WORK_MASTER_CAP_PROP_GROUP_TABLE': source_schema + '.' + source_table,
'FID_WORK_MASTER_CAP_PROP_GROUP_KEY': main_id,
'PID_WORK_MASTER_CAP_GROUP_TABLE': source_schema + '.' + source_table,
'SCN': scn
}
work_master_cap_prop.append(d)
m_work_master_cap_s_work_master_cap_prop_by_work_master_cap_prop.append(m)
work_master_cap_info = {
'ACTION_FLAG': op,
'TXN_ID': train_id,
'DESCRIPTION': row['MODEL_SERIAL'],
'TXN_SEQ': train_seq,
'ACTION_SEQ': load_seq,
'GROUP_KEY': main_id,
'ID': row['MODEL_NAME'],
'ROW_STATUS': row_status,
'GROUP_TABLE': source_schema + '.' + source_table,
'R_ID': rowid,
'SYSTEM_ID': system_id,
'SCN': scn
}
work_master_cap.append(work_master_cap_info)
data = {
'work_master_cap_prop': work_master_cap_prop,
'work_master_cap': work_master_cap,
'm_work_master_cap_s_work_master_cap_prop_by_work_master_cap_prop': m_work_master_cap_s_work_master_cap_prop_by_work_master_cap_prop
}
outputStream.write(bytearray(json.dumps(data, separators=(',', ':')).encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
如果我想使用自定义处理器来处理 python 脚本,有什么建议吗?
解决方案
推荐阅读
- swiftui - SwiftUI:弹出或关闭视图控制器
- html - 如何在反应中画线
- c# - 无法从身份注册页面访问外部控制器 ActionResults
- angular - AOT 的 Angular 8 BrowserAnimations 错误
- python - 将数据框的第一行复制到具有相同列的空数据框
- java - 在 IntelliJ 中,如何运行 maven 构建、执行生成的应用程序并一步连接调试器?
- hash - 我可以对这段代码使用折叠(或其他类型的减少)吗?
- c# - 如何在作为类属性的谓词中使用类属性?
- androidx - Android Studio 在构建 gradle 时迁移到 androidx 后卡在 :app:transformdexarchivewithdexmergerfordebug
- python - 每个测试的 Python unittest 单独的 setup() 方法