apache-nifi - 如何在 NiFi 的 python 类中传递流文件属性?
问题描述
目标:通过获取流文件属性,在 Python 中使用 executeScript 在 CSV 中添加文件名字段值。
问题:如何为我传递流文件以获取要包含在输出流写入中的属性
下面的示例代码无法获取属性文件名。
class PyStreamCallback(StreamCallback):
def __init__(self,flowFile):
self.ff = flowFile
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
list_index = 0
textArr = []
newText=''
for t in text.splitlines():
list_index += 1
t= t + '|' + str(list_index) + '|"' + t + '"|' + self.ff.getAttribute('filename')
textArr.append(t)
newText = '\n'.join(textArr)
outputStream.write(bytearray(newText.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
解决方案
声明一个全局变量来保存文件名属性值。示例代码片段,
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
global file_name
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
with wrap(inputStream) as f:
lines = f.readlines()
for line in lines:
line = line.strip() + '|' + file_name + '\n'
with wrap(outputStream, 'w') as filehandle:
filehandle.writelines("%s" % line for line in lines)
# end class
flowFile = session.get()
if (flowFile != None):
try
file_name = flowFile.getAttribute('filename')
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
attrMap = {'exception': str(excp)}
flowFile = session.putAllAttributes(flowFile , attrMap)
session.transfer(flowFile , ExecuteScript.REL_FAILURE)
推荐阅读
- string - HIVE在哪里日期过滤x天?字符串格式
- angular - Angular jest spectator 在测试另一个服务时模拟服务依赖
- php - wp插件中的排序表不起作用,缺少什么?
- php - 如何在 Laravel 中使用条带格式化 URL?
- mailgun - 为什么我不能使用 MailGun 以编程方式验证电子邮件?
- c++ - 在 3 个相同的 sscanf 调用中,中间的一个不起作用
- python - 将数据附加到 csv 文件中的行尾 (Python)
- python - 从 sqlalchemy 完全禁用日志
- kubernetes - 在 J-Meter 上创建数据作为负载
- javascript - 为什么我的 guildMemberAdd 不起作用?任何人都可以帮助我吗?