apache-nifi - 在 Apache Nifi 中创建自定义处理器
问题描述
我正在构建一个自定义处理器来处理流文件,以处理我需要从本地文件系统读取 CSV 文件的流文件。我创建了一个属性描述符 CSV_PATH 如下
public static final PropertyDescriptor CSV_PATH = new
PropertyDescriptor
.Builder().name("CSV Path")
.displayName("CSV Path")
.description("CSV Path Reader")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new
ArrayList<PropertyDescriptor>();
descriptors.add(JSON_PATH);
descriptors.add(CSV_PATH);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
现在我想在配置处理器时获取 UI 中设置的 CSV_PATH 属性的值。我无法获得 CSV_PATH 值。此外,如果我在代码中硬编码文件路径,那么我仍然无法从本地文件系统读取 CSV。
解决方案
您想使用以下代码从 中检索 的PropertyDescriptor
值ProcessContext
:
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String csvPath = context.getProperty(CSV_PATH).getValue();
// Do something with csvPath
}
如果您决定在该属性描述符中支持NiFi 表达式语言,您还需要对此进行评估:
final String csvPath = context.getProperty(CSV_PATH).evaluateAttributeExpressions().getValue();
还有其他方法覆盖,包括流文件属性、变量注册表、自定义装饰器等。
这记录在Apache NiFi 开发人员指南中。我最近在 2019 年巴塞罗那 Dataworks 峰会上做了一个演讲,涵盖了定制处理器开发,其中包括一些最佳实践和可能有用的示例。您还可以查看 NiFi 代码库中的任何现有处理器以查看示例。
推荐阅读
- spring - Spring Data 多个 jpa-named-queries.properties 文件
- javascript - 如何使用标准浏览器 API 计算相对 URL
- apache-nifi - FlowFile 内容与 FlowFile 属性的最佳实践
- angular - Angular Route Param 值未与 ngClass 三元绑定
- node.js - 如何在 azure web 应用程序中隐藏 node_modules
- google-chrome - chrome 扩展可以访问用户提交的控制台命令吗?
- json - 如何更新雪花中的半结构化数据?
- java - 如何使用 ImageView 而不是菜单项制作底部导航栏?
- java - 为什么 Spring boot 不接受我的 Axios POST 请求?
- distributed - 我可以将文件块存储在 IPFS 网络中的不同系统中吗?