python - Apache NiFi:ExecuteStreamCommand 生成两个流文件
问题描述
我目前正在ExecuteStreamCommand
使用 PYthon 遇到 Apache NiFi 的问题。我有一个脚本,它读取 csv 并将其转换为 pandas-Dataframes,然后转换为 JSON。由于列的命名不一致,该脚本将 csv 文件拆分为多个 DataFrame。我当前的脚本如下所示:
import pandas as pd
import sys
input = sys.stdin.readlines()
#searching for subfiles and saving them to a list with files ...
appendDataFrames = []
for dataFrames in range(len(files)):
df = pd.DataFrame(files[dataFrame])
#several improvements of DataFrame ...
appendDataFrames.append(df)
output = pd.concat(appendDataFrames)
JSONOutPut = output.to_json(orient='records', date_format='iso', date_unit='s')
sys.stdout.write(JSONOutPut)
在我的下一个处理器的队列中,我现在可以看到一个流文件为 JSON(如预期的那样)。我的问题是,是否可以在单独的 FlowFiles 中编写每个 JSON,以便我的下一个处理器能够分开处理它们?我需要这样做,因为下一个处理器是 aInferAvroSchema
并且因为所有 JSON 都有不同的模式,所以这是没有机会的。我弄错了吗?或者有没有可能解决这个问题的方法?
下面的代码将不起作用,因为它无论如何都在同一个流文件中,我InferAvroSchema
无法处理这个分离。
import pandas as pd
import sys
input = sys.stdin.readlines()
#searching for subfiles and saving them to a list with files ...
appendDataFrames = []
for dataFrames in range(len(files)):
df = pd.DataFrame(files[dataFrame])
#several improvements of DataFrame ...
JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
sys.stdout.write(JSONOutPut)
提前致谢!
解决方案
使用 ExecuteStreamCommand 您不能拆分输出,因为您必须写入标准输出。
但是,您可以将一些定界符写入输出并使用SplitContent
与下一个处理器相同的定界符。
推荐阅读
- apache-kafka - 如何输出kafka消费者属性?
- c# - C# URI 验证
- java - 如何修复关于我的 switch 语句的输出?
- google-cloud-platform - 如何通过 API 获取服务帐户对 GCP 中项目和组织的所有角色/权限
- monitoring - 使用凭据在 netscaler 中进行监控
- django - django-tenant 和 djangorestframework 测试
- python - setMargin() 方法不适用于 Qt Python 中的 QVBoxLayout
- vba - ACCESS VBA 如何将多个表中的行添加到列表框
- php - PHP 长三元运算符行和 120 字符行长 PSR 规则
- python - 如何删除由一系列特定值组成的行