首页 > 解决方案 > NIFI ExecuteScript 1.10.0 处理器:java.io.IOException:标记无效

问题描述

我将 ExecuteScript 处理器与 python 一起使用。它曾经可以工作,但今天我写了两千多行脚本,问题发生了。日志如下,我认为是BufferedReader方法标记/重置导致了这个问题!

2020-05-13 17:21:49,433 ERROR [Timer-Driven Process Thread-87] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=b015d494-32cb-35f7-92b2-268a30ff730b] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: java.io.IOException: java.io.IOException: Mark invalid: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: java.io.IOException: java.io.IOException: Mark invalid
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: java.io.IOException: java.io.IOException: Mark invalid
    at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:254)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: javax.script.ScriptException: java.io.IOException: java.io.IOException: Mark invalid
    at org.python.jsr223.PyScriptEngine.scriptException(PyScriptEngine.java:226)
    at org.python.jsr223.PyScriptEngine.compileScript(PyScriptEngine.java:93)
    at org.python.jsr223.PyScriptEngine.eval(PyScriptEngine.java:31)
    at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
    at org.apache.nifi.script.impl.JythonScriptEngineConfigurator.eval(JythonScriptEngineConfigurator.java:62)
    at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:238)
    ... 11 common frames omitted
Caused by: org.python.core.PyException: null
    at org.python.core.Py.JavaError(Py.java:552)
    at org.python.core.ParserFacade.fixParseError(ParserFacade.java:107)
    at org.python.core.ParserFacade.parseExpressionOrModule(ParserFacade.java:136)
    at org.python.util.PythonInterpreter.compile(PythonInterpreter.java:320)
    at org.python.util.PythonInterpreter.compile(PythonInterpreter.java:316)
    at org.python.util.PythonInterpreter.compile(PythonInterpreter.java:308)
    at org.python.jsr223.PyScriptEngine.compileScript(PyScriptEngine.java:87)
    ... 15 common frames omitted
Caused by: java.io.IOException: Mark invalid
    at java.io.BufferedReader.reset(BufferedReader.java:512)
    at org.python.core.ParserFacade.parseExpressionOrModule(ParserFacade.java:133)
    ... 19 common frames omitted

脚本示例代码如下,我模拟我的脚本,'for循环'会一直在进行,原始脚本将到达第2473行

import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
    def __init__(self, flowfile):
        self.ff = flowfile
        pass
    def process(self, inputStream, outputStream):
        prop_col_names = ['NOROUTE_FLAG','DIP_TOTALITEM','INPUTDATE','MODEL_ALIAS','LINE_RATE','PWB_NO','WMS_SEND_FLAG','UNIT_PRICE','DIP_CHECK_DATE','LINK_FLAG','PCB_QTY','MODEL_TYPE','MP_FLAG','YHP_APP','ONENESS_QTY','SMT_CAR_QTY','SMT_TOTALITEM','KP_NO','MODEL_VER','PALLET_SNTYPEID','VALID_FLAG','LINE_ROUTE','CHECK_FLAG','REDUCETONORMAL','WUNIT','CHANGE_DATE','PRODUCT_NAME','AUTO_SN','SSN_MODEL','SSN_DEFINE','SMT_TOTALSOLDER','CREATE_DATE','PACK_DATE','PICK_DATE','PCB_QTY_FLAG','GWEIGHT','SB_MODEL','MO_TYPE','QTY_CONTROL_FLAG','SSN_TYPE','PCBA_VER','NWEIGHT','CLASS_NAME','DEFAULT_LINE','MODEL_RANGE2','MODEL_RANGE1','ROUTE_CODE','SAP_MODEL_FLAG','EXTRA_QTY','CATEGORY','POINT_A','POINT_B','PROD_TYPE','START_SN','BOM_NO','MODEL_PARENT','STANDARD','KEYIN_EMPCODE','DIP_TOTALSOLDER','MODEL_ATTRIBUTE','PCB_CODE','PCB_VER','PCB_TYPE_ID','TYPE_NUMBER','BIOS_VER','SERIES_NAME','MFG_PART','CHANGE_EMP','END_SN','NORMALTOADD','PB_FLAG','SP_MODEL_FLAG','SSN_LENGTH','CREATE_EMP','LABEL_CONTROL_FLAG']
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        obj = json.loads(text)

        system_id = self.ff.getAttribute('system_id')
        source_schema = self.ff.getAttribute('source_schema')
        source_table = self.ff.getAttribute('source_table')

        work_master_cap_prop = []
        work_master_cap = []
        m_work_master_cap_s_work_master_cap_prop_by_work_master_cap_prop = []

        for row in obj:
            main_id = row['MODEL_NAME']
            source_schema = row.get('owner', source_schema)
            source_table = row.get('table', source_table)
            op = row.get('op', 'i')
            scn = row.get('scn', -1)
            train_id = row.get('train_id', '')
            train_seq = row.get('train_seq', -1)
            rowid = row.get('rowid', '')
            load_seq = row.get('load_seq', -1)
            row_status = 'I'
            for col_name in prop_col_names:
                d = {}
                m = {}
                if col_name == 'NOROUTE_FLAG':
                    d = {
                        'ACTION_FLAG': op,
                        'TXN_ID': train_id,
                        'TXN_SEQ': train_seq,
                        'ACTION_SEQ': load_seq,
                        'GROUP_KEY': main_id,
                        'Value': row['NOROUTE_FLAG'],
                        'ID': 'NOROUTE_FLAG',
                        'ROW_STATUS': row_status,
                        'GROUP_TABLE': source_schema + '.' + source_table,
                        'R_ID': rowid,
                        'SYSTEM_ID': system_id,
                        'SCN': scn
                        }
                    m = {
                        'ACTION_FLAG': op,
                        'TXN_ID': train_id,
                        'FID_WORK_MASTER_CAP_PROP_SYSTEM_ID': system_id,
                        'PID_WORK_MASTER_CAP_SYSTEM_ID': system_id,
                        'TXN_SEQ': train_seq,
                        'ACTION_SEQ': load_seq,
                        'PID_WORK_MASTER_CAP_ID': row['MODEL_NAME'],
                        'FID_WORK_MASTER_CAP_PROP_ID': 'NOROUTE_FLAG',
                        'ROW_STATUS': row_status,
                        'R_ID': rowid,
                        'PID_WORK_MASTER_CAP_GROUP_KEY': main_id,
                        'FID_WORK_MASTER_CAP_PROP_GROUP_TABLE': source_schema + '.' + source_table,
                        'FID_WORK_MASTER_CAP_PROP_GROUP_KEY': main_id,
                        'PID_WORK_MASTER_CAP_GROUP_TABLE': source_schema + '.' + source_table,
                        'SCN': scn
                        }
                elif col_name == 'DIP_TOTALITEM':
                    d = {
                        'ACTION_FLAG': op,
                        'TXN_ID': train_id,
                        'TXN_SEQ': train_seq,
                        'ACTION_SEQ': load_seq,
                        'GROUP_KEY': main_id,
                        'Value': row['DIP_TOTALITEM'],
                        'ID': 'DIP_TOTALITEM',
                        'ROW_STATUS': row_status,
                        'GROUP_TABLE': source_schema + '.' + source_table,
                        'R_ID': rowid,
                        'SYSTEM_ID': system_id,
                        'SCN': scn
                        }
                    m = {
                        'ACTION_FLAG': op,
                        'TXN_ID': train_id,
                        'FID_WORK_MASTER_CAP_PROP_SYSTEM_ID': system_id,
                        'PID_WORK_MASTER_CAP_SYSTEM_ID': system_id,
                        'TXN_SEQ': train_seq,
                        'ACTION_SEQ': load_seq,
                        'PID_WORK_MASTER_CAP_ID': row['MODEL_NAME'],
                        'FID_WORK_MASTER_CAP_PROP_ID': 'DIP_TOTALITEM',
                        'ROW_STATUS': row_status,
                        'R_ID': rowid,
                        'PID_WORK_MASTER_CAP_GROUP_KEY': main_id,
                        'FID_WORK_MASTER_CAP_PROP_GROUP_TABLE': source_schema + '.' + source_table,
                        'FID_WORK_MASTER_CAP_PROP_GROUP_KEY': main_id,
                        'PID_WORK_MASTER_CAP_GROUP_TABLE': source_schema + '.' + source_table,
                        'SCN': scn
                        }

                work_master_cap_prop.append(d)
                m_work_master_cap_s_work_master_cap_prop_by_work_master_cap_prop.append(m)
            work_master_cap_info = {
                'ACTION_FLAG': op,
                'TXN_ID': train_id,
                'DESCRIPTION': row['MODEL_SERIAL'],
                'TXN_SEQ': train_seq,
                'ACTION_SEQ': load_seq,
                'GROUP_KEY': main_id,
                'ID': row['MODEL_NAME'],
                'ROW_STATUS': row_status,
                'GROUP_TABLE': source_schema + '.' + source_table,
                'R_ID': rowid,
                'SYSTEM_ID': system_id,
                'SCN': scn
            }
            work_master_cap.append(work_master_cap_info)


        data = {
            'work_master_cap_prop': work_master_cap_prop,
            'work_master_cap': work_master_cap,
            'm_work_master_cap_s_work_master_cap_prop_by_work_master_cap_prop': m_work_master_cap_s_work_master_cap_prop_by_work_master_cap_prop
        }
        outputStream.write(bytearray(json.dumps(data, separators=(',', ':')).encode('utf-8')))


flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback(flowFile))
    session.transfer(flowFile, REL_SUCCESS)

如果我想使用自定义处理器来处理 python 脚本,有什么建议吗?

标签: javaapache-nifi

解决方案


推荐阅读