apache-nifi - NiFi如何释放流文件直到下游流程完成
问题描述
我正在使用 NiFi 设计数据摄取模式。一个流程需要停止发布流文件,直到下游流程完成处理。我尝试使用等待并通知并没有取得任何成功。我希望队列大小和背压可以跨几个处理器设置。
同样,如果有一种方法可以实现逻辑:如果当前在多个处理器之间进行处理,则不允许流文件进入。
解决方案
您需要 MonitorActivity 与 executestreamcommand 的组合(使用 python“nipyapi”脚本)。
我的一个工作流程中有类似的要求。
您需要先安装 python lib nipyapi 并在 nifi 盒子上创建此脚本。
from time import sleep
import nipyapi
nipyapi.utils.set_endpoint('http://ipaddress:port/nifi-api', ssl=False, login=False)
## Get PG ID using the PG Name
mypg = nipyapi.canvas.get_process_group('start')
nipyapi.canvas.schedule_process_group(mypg.id, scheduled=True) ## Start
sleep(1)
nipyapi.canvas.schedule_process_group(mypg.id, scheduled=False) ## Stop
我将把模板放在下面链接的 img 中,查看监视器活动处理器上的配置 - 如果 10 秒内没有活动发生,它将生成一个流(你可以玩时间)。 下载模板
注意:如果您有高延迟要求,这不是一个很好的方法。
另一个想法是监视整个流中的聚合队列,如果队列为零,则重新启动启动流。(如果您有很多联系,这将非常激烈)
推荐阅读
- ios - tableview滚动得到错误的最后一个单元格
- dns - Odoo 10 限制嵌入到表单视图中的树视图中的记录
- mysql - 选择表之间的计数
- java - 将数据发布到谷歌文本到语音时收到的 Http 状态 400 接收到无效的 JSON 有效负载
- python - Openpyxl - 单元格操作
- javascript - 无法使用 axios 和 vue.js 加载本地 json 文件
- javascriptcore - 如何将 JSObjectRef 转换为 JSValueRef
- sql - Node.js & SQL Server Sequelize 加入表
- c++ - 在带有模板的结构中,为什么左值会推导出为右值?
- sql-server - 生成备份脚本。脚本中的无效符号