首页 > 解决方案 > 调试复杂 NiFi 数据流的理想方法

问题描述

根据我在使用 NiFi 构建一些 DB 摄取 PoC 后的理解,整个数据流作为流文件流运行。并且在任何特定时间,执行控制可以同时在一个或多个处理器上。

所以我真的很困惑如何为任何故障调试复杂的数据流。

我的 PoC 工作流程本身看起来像这样。 nifi 数据流

当我们使用生产用例时,它会变得比这复杂得多。所以我有几个问题。

  1. 如何知道数据流的状态。如果假设 10 个分叉流文件中有 4 个GenerateTableFetch因数据库池错误而失败,我怎么知道哪些失败了,以及如何快速重放它们,而无需去数据来源并一一进行。

  2. 有没有办法通过查看数据流来知道哪个处理器的流文件出现故障。

我对使用 NiFi 调试数据流有更多的疑虑/困惑,如果有人可以向我指出一些文档或分享最佳实践,那将很有帮助。

谢谢。

标签: apache-nifihortonworks-data-platformhortonworks-dataflow

解决方案


1-如何知道数据流的状态。如果假设 10 个分叉流文件中有 4 个在 GenerateTableFetch 中因数据库池错误而失败,我如何知道哪些失败以及如何快速重放它们而无需进行数据治理并一一进行。

您可以通过将类型故障或任何其他关系进行管理,具体取决于您使用的处理器类型发送到进程组以处理错误。

所以就像布莱恩提到的,你不希望他们自动终止,除非你不在乎。

2-有没有办法通过查看数据流来知道哪个处理器的流文件出现故障。

是的 - 您必须设置“公告级别”来区分日志级别

如何管理失败的 NiFi 流?

好吧,您需要与 BuletinBoard 成为最好的朋友,请参见此处SiteToSiteStatusReportingTask或者您可以使用InvokeHttp对本机NiFI Rest Api进行GET调用http://nifi-server:port/nifi-api/flow/bulletin-board和这个将响应一个详细的 json 对象,可以对其进行解析,然后将其推送到PutSlack/PutEmail/PutSNS中以查找任何错误。

拥有共享进程组来处理任何传入的错误流文件也是理想的,这个 PG 将使用规则和路由构建,以应用于 NiFi 服务器中的所有数据流逻辑。拥有将与所有流一起携带并在数据流过程中使用的 PG 特定属性至关重要。

例如:

进程组“Demo”有一个称为Set PG Attributes的处理器,它设置PGName属性、PGType属性、FailEmailTitle属性等。如果我的流程在任何时候失败,故障关系将根据Set PG Attributes处理器中设置的属性之一的值路由我的失败流程

这是我当前设置的图表,其中我将所有故障发送到同一个共享 PG。 在此处输入图像描述

其他选项

如果您认为公告仅持续 5 分钟是一个问题,那么您可以使用nifi-app.log,它可以设置为由 /opt/nifi/conf/logback.xml文件中的规则填充

  <logger name="org.apache.nifi" level="ERROR"/>
    <logger name="org.apache.nifi.processors" level="DEBUG"/>
    <logger name="org.apache.nifi.processors.standard.LogAttribute" level="ERROR"/>
    <logger name="org.apache.nifi.processors.standard.LogMessage" level="ERROR"/>
    <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="ERROR" />

因此,您可以让 tailFile 处理器查看您的本地日志文件并获取错误信息或您认为对您有用的信息,并从中获得一些意义。


推荐阅读