首页 > 解决方案 > 如何从 NiFi 中的 GetFilesProcessor 读取文件

问题描述

以下是我的流程:

GetFile > ExecuteSparkInteractive > PutFile

我想从GetFile处理器中的ExecuteSparkInteractive处理器读取文件,应用一些转换并将其放在某个位置。下面是我的流程 在此处输入图像描述

spark scala code我在code火花处理器部分写道:

val sc1=sc.textFile("local_path")
sc1.foreach(println)

流程中没有发生任何事情。那么如何使用 GetFile 处理器读取 spark 处理器中的文件。

第二部分:
我尝试了以下流程只是为了练习:

ExecuteScript > PutFile > LogMessage

我在执行脚本处理器中提到了以下代码:

readFile = open("/home/cloudera/Desktop/sample/data","r")
for line in readFile:
    lines = line.strip()
    finalline = re.sub(pattern='((?<=[0-9])[0-9]|(?<=\.)[0-9])',repl='X',string=lines)
readFile = open("/home/cloudera/Desktop/sample/data","w")
readFile.write(finalline)  

代码工作正常,但它不会将格式化的数据写入目标文件夹。那么我在这里哪里出错了。另外,我在本地机器上安装了 pandas 并从 executescript 处理器运行了 pandas 代码,但 nifi 不读取 pandas 模块。为什么会这样?我已经尽力了。另外,我找不到任何相关链接,我可以在其中获得基本流程

标签: apache-sparkhadoopbigdataapache-nifi

解决方案


这不是真正的工作方式...... GetFile 正在提取 NiFi 节点本地的文件并将它们带入 NiFi 流进行处理。ExecuteSparkInteractive 在远程 Spark 集群上启动一个 Spark 作业,它不会将数据传输到 Spark。因此,您可能希望将数据放在 Spark 可以访问的地方,例如 GetFile -> PutHDFS -> ExecuteSparkInteractive。


推荐阅读