首页 > 解决方案 > NiFi如何释放流文件直到下游流程完成

问题描述

我正在使用 NiFi 设计数据摄取模式。一个流程需要停止发布流文件,直到下游流程完成处理。我尝试使用等待并通知并没有取得任何成功。我希望队列大小和背压可以跨几个处理器设置。

同样,如果有一种方法可以实现逻辑:如果当前在多个处理器之间进行处理,则不允许流文件进入。

任何帮助表示赞赏 流程图

标签: apache-nifi

解决方案


您需要 MonitorActivity 与 executestreamcommand 的组合(使用 python“nipyapi”脚本)。

我的一个工作流程中有类似的要求。

在此处输入图像描述

您需要先安装 python lib nipyapi 并在 nifi 盒子上创建此脚本。

from time import sleep
import nipyapi

nipyapi.utils.set_endpoint('http://ipaddress:port/nifi-api', ssl=False, login=False)
## Get PG ID using the PG Name
mypg = nipyapi.canvas.get_process_group('start')
nipyapi.canvas.schedule_process_group(mypg.id, scheduled=True) ## Start
sleep(1)
nipyapi.canvas.schedule_process_group(mypg.id, scheduled=False) ## Stop

我将把模板放在下面链接的 img 中,查看监视器活动处理器上的配置 - 如果 10 秒内没有活动发生,它将生成一个流(你可以玩时间)。 下载模板

注意:如果您有高延迟要求,这不是一个很好的方法。

另一个想法是监视整个流中的聚合队列,如果队列为零,则重新启动启动流。(如果您有很多联系,这将非常激烈)


推荐阅读