首页 > 解决方案 > Apache NiFi:ExecuteStreamCommand 生成两个流文件

问题描述

我目前正在ExecuteStreamCommand使用 PYthon 遇到 Apache NiFi 的问题。我有一个脚本,它读取 csv 并将其转换为 pandas-Dataframes,然后转换为 JSON。由于列的命名不一致,该脚本将 csv 文件拆分为多个 DataFrame。我当前的脚本如下所示:

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   appendDataFrames.append(df)

output = pd.concat(appendDataFrames)
JSONOutPut = output.to_json(orient='records', date_format='iso', date_unit='s')
sys.stdout.write(JSONOutPut)

在我的下一个处理器的队列中,我现在可以看到一个流文件为 JSON(如预期的那样)。我的问题是,是否可以在单独的 FlowFiles 中编写每个 JSON,以便我的下一个处理器能够分开处理它们?我需要这样做,因为下一个处理器是 aInferAvroSchema并且因为所有 JSON 都有不同的模式,所以这是没有机会的。我弄错了吗?或者有没有可能解决这个问题的方法?

下面的代码将不起作用,因为它无论如何都在同一个流文件中,我InferAvroSchema无法处理这个分离。

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
   sys.stdout.write(JSONOutPut)

提前致谢!

标签: pythonjsonapache-nifi

解决方案


使用 ExecuteStreamCommand 您不能拆分输出,因为您必须写入标准输出。

但是,您可以将一些定界符写入输出并使用SplitContent与下一个处理器相同的定界符。


推荐阅读