首页 > 解决方案 > 如何在 NiFi 的 python 类中传递流文件属性?

问题描述

目标:通过获取流文件属性,在 Python 中使用 executeScript 在 CSV 中添加文件名字段值。

问题:如何为我传递流文件以获取要包含在输出流写入中的属性

下面的示例代码无法获取属性文件名。

class PyStreamCallback(StreamCallback):
 def __init__(self,flowFile):
   self.ff = flowFile
   pass
def process(self, inputStream, outputStream):
   text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
   list_index = 0
   textArr = []
   newText=''
   for t in text.splitlines():
   list_index += 1
   t= t + '|' + str(list_index) + '|"' + t + '"|' + self.ff.getAttribute('filename')
   textArr.append(t)
newText = '\n'.join(textArr)

outputStream.write(bytearray(newText.encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)

标签: apache-nifi

解决方案


声明一个全局变量来保存文件名属性值。示例代码片段,

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO


global file_name

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):        
        with wrap(inputStream) as f:
            lines = f.readlines()            
            for line in lines:
                line = line.strip() + '|' + file_name + '\n'

            with wrap(outputStream, 'w') as filehandle:
                filehandle.writelines("%s" % line for line in lines)


# end class
flowFile = session.get()
if (flowFile != None):
    try

     file_name = flowFile.getAttribute('filename')    
     flowFile = session.write(flowFile, PyStreamCallback())    
     session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
        attrMap = {'exception': str(excp)}
        flowFile = session.putAllAttributes(flowFile , attrMap)
        session.transfer(flowFile , ExecuteScript.REL_FAILURE)

推荐阅读