apache-spark - 如何从 NiFi 中的 GetFilesProcessor 读取文件
问题描述
以下是我的流程:
GetFile > ExecuteSparkInteractive > PutFile
我想从GetFile
处理器中的ExecuteSparkInteractive
处理器读取文件,应用一些转换并将其放在某个位置。下面是我的流程
spark scala code
我在code
火花处理器部分写道:
val sc1=sc.textFile("local_path")
sc1.foreach(println)
流程中没有发生任何事情。那么如何使用 GetFile 处理器读取 spark 处理器中的文件。
第二部分:
我尝试了以下流程只是为了练习:
ExecuteScript > PutFile > LogMessage
我在执行脚本处理器中提到了以下代码:
readFile = open("/home/cloudera/Desktop/sample/data","r")
for line in readFile:
lines = line.strip()
finalline = re.sub(pattern='((?<=[0-9])[0-9]|(?<=\.)[0-9])',repl='X',string=lines)
readFile = open("/home/cloudera/Desktop/sample/data","w")
readFile.write(finalline)
代码工作正常,但它不会将格式化的数据写入目标文件夹。那么我在这里哪里出错了。另外,我在本地机器上安装了 pandas 并从 executescript 处理器运行了 pandas 代码,但 nifi 不读取 pandas 模块。为什么会这样?我已经尽力了。另外,我找不到任何相关链接,我可以在其中获得基本流程
解决方案
这不是真正的工作方式...... GetFile 正在提取 NiFi 节点本地的文件并将它们带入 NiFi 流进行处理。ExecuteSparkInteractive 在远程 Spark 集群上启动一个 Spark 作业,它不会将数据传输到 Spark。因此,您可能希望将数据放在 Spark 可以访问的地方,例如 GetFile -> PutHDFS -> ExecuteSparkInteractive。
推荐阅读
- android - TextAppearance.A 继承自 TextAppearance.AB 不会导致 android 样式中的循环引用?
- nutch - nutch 在索引之前替换解析的内容
- javascript - JavaScript中长数的计算
- react-native - Fetch post method is not working - react native
- android - 如何解决FTP获取excel文件崩溃的问题?
- firebase - Realtime Firebase User id Fetching Problem
- mysql - 如何动态选择列并汇总mysql中的所有大小
- javascript - 如何将输入标签(type='month')的值转换为字符串,如“yyyy-MM”
- qt - QML:在一个页面中加载 50 多个组件
- javascript - 如何在javascript中从Hashmap中删除重复对象