python - 如何使用 ExecuteScript 和 python 从 nifi 中的一个传入流文件创建多个流文件
问题描述
在本地运行,这完全符合我的要求(在位置 7-10 有一个带有许多不同代码的传入流文件,并且每个唯一代码输出 1 个文件)例如,如果记录 1-5 在位置 7-10 有 1234,并且记录 6 在位置 7-10 有 2345,记录 7 在位置 7-10 有 1234,然后会有一个名为 1234_file.txt 的文件包含第 1-5 行和第 7 行,第二个文件 2345_file.txt 将包含第 6 行输入文件:
f = open("test_comp.txt", "r")
for x in f:
comp = x[6:10]
print(comp)
n = open(comp+"_file.txt","a")
n.write(x)
n.close()
f.close()
在nifi中,我正在尝试这个:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
f = open(inputStream, 'r')
for x in f:
comp = x[6:10]
print("comp: ",comp)
newFile = open(comp+"_file.txt","a")
newFile.write(x)
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
session.commit()
它似乎正在获取输入并将comp正确存储为预期的位置7-10,但我没有得到多个流文件(对于x [6:10]中的每个唯一字符串。流出的流文件是1零字节文件。
对我所缺少的有什么想法吗?
解决方案
您正在直接写入文件系统上的文件,而不是NiFi生态系统中的对象流文件。我建议阅读Apache NiFi 开发人员指南以了解这些模式的上下文,并查看一些Python ExecuteScript 示例以查看相关的 Python 代码。
您需要创建多个流文件对象,将数据映射到它们,然后将它们全部传输到各自的关系,而不是写出单个流文件。
您是否有理由需要使用自定义 Python 代码而不是SplitRecord
和/或PartitionRecord
处理器来执行此操作?我认为PartitionRecord
可以很容易地解决您描述的问题。
推荐阅读
- angular - 仅对特定 URL 禁用不区分大小写
- mysql - 在什么条件下应该创建新的 mysql 表?
- visual-studio - 如何快速评论和取消评论代码?
- rust - 如何使用 Rust 浏览 OPC UA 服务器的节点?
- python - Python:在第二个破折号之前按字符分组
- javascript - calc 没有在 HTMLbutton.onclick 中定义
- c++ - 突破 OpenMP 部分和任务
- java - 如何修复代码以显示来自 Firebase 的数据?
- mysql-workbench - 如何修复 MySQL Workbench 8 在 Ubuntu 18.10 上崩溃
- css - 在 Google 搜索中为“下一步”按钮找到正确的 xpath