apache-spark - 如何管理 Nifi ExecuteSparkINteractive 处理器中的“等待”状态?
问题描述
我正在使用 Nifi ExecuteSparkInteractive 处理器运行火花代码,我看到它的结果是成功、失败和等待。当它进入成功和失败状态时,我能够完美地管理和路由结果,但有时我看到文件丢失了,我认为它将进入等待状态,并且在我的 Nifi 流程中没有为“等待”状态定义路由。谁能帮我理解究竟什么是“等待”状态,以及如何在不影响最终结果的情况下在这种状态下路由控件。
先感谢您!
我试图将 nifi 控件路由到 UpdateAttribute 处理器,但我没有看到 spark 代码的结果。看起来代码在 ExecuteSparkINteractive 的“等待”状态期间没有被执行。
没有针对此问题的代码,因为 Nifi 提供了处理器之间的复选框和连接控制器。
对于成功和失败条件以及等待状态,我已将控制从 ExecuteSparkInteractive 处理器传递给 RouteOnAttribute 进程,但在等待状态期间未执行火花代码。
Spark 代码应该在等待状态期间执行,然后它应该路由到成功和失败。
解决方案
我想出了 ExecuteSparkInteractive 处理器中“等待”状态的目的。
每当控制/文件流经 Execute spark 交互式处理器时,它都会从该处理器执行 spark 代码,但它会首先查找在 yarn 集群上运行的可用 Livy Spark 会话。如果没有可用的 Livy Spark 会话(可能是其他进程正在这些会话上运行),则控制进入等待状态。
理想情况下,ExecuteSparkInteractive 处理器正在等待 Livy spark 会话可用于执行代码。这就是如果未处理“等待”状态,代码不会执行的原因。
为了解决这个问题,我在“等待”状态下调用 ExecuteSparkInteractive 给它自己,基本上只要 Livy 会话可供处理器使用,就将控制循环回同一个处理器以运行 spark 代码。这种方法解决了我的问题。
我附上了我的模板截图以供参考。
推荐阅读
- c - 在struct中实现信号量,然后使用malloc()初始化struct,会不会出现futex错误?
- algorithm - 如果 f(n) 是 Ω(n∗g(n)),那么 f(n) 不是 O(g(n))
- c - 如何在C编程中以struct格式制作返回元素
- java - Eclipse 上的“ChromeDriver/WebDriver 无法恢复为类型”错误
- xcode - 使用带有 Cocoapods 的 Xcode 11.4 安装 Mongodb Realm 会导致错误
- c++ - 以最少的时间复杂度将一个向量复制到另一个向量?
- python - Pandas 遍历多索引
- pyspark - Pyspark 基于关闭时间差计算字段
- reactjs - React hooks async 问题:map 在数据返回之前执行 - 解决方案?
- javascript - 通过 JS 获取 web 服务器上目录的文件和内容