首页 > 解决方案 > 在 Apache Nifi 中创建自定义处理器

问题描述

我正在构建一个自定义处理器来处理流文件,以处理我需要从本地文件系统读取 CSV 文件的流文件。我创建了一个属性描述符 CSV_PATH 如下

public static final PropertyDescriptor CSV_PATH = new 
PropertyDescriptor
.Builder().name("CSV Path")
.displayName("CSV Path")
.description("CSV Path Reader")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();



@Override

protected void init(final ProcessorInitializationContext context) {

final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();

descriptors.add(JSON_PATH);

descriptors.add(CSV_PATH);

this.descriptors = Collections.unmodifiableList(descriptors);



final Set<Relationship> relationships = new HashSet<Relationship>();

relationships.add(SUCCESS);

this.relationships = Collections.unmodifiableSet(relationships);

}

现在我想在配置处理器时获取 UI 中设置的 CSV_PATH 属性的值。我无法获得 CSV_PATH 值。此外,如果我在代码中硬编码文件路径,那么我仍然无法从本地文件系统读取 CSV。

标签: apache-nifi

解决方案


您想使用以下代码从 中检索 的PropertyDescriptorProcessContext

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }

    final String csvPath = context.getProperty(CSV_PATH).getValue();

    // Do something with csvPath

}

如果您决定在该属性描述符中支持NiFi 表达式语言,您还需要对此进行评估:

final String csvPath = context.getProperty(CSV_PATH).evaluateAttributeExpressions().getValue();

还有其他方法覆盖,包括流文件属性、变量注册表、自定义装饰器等。

这记录在Apache NiFi 开发人员指南中。我最近在 2019 年巴塞罗那 Dataworks 峰会上做了一个演讲,涵盖了定制处理器开发,其中包括一些最佳实践和可能有用的示例。您还可以查看 NiFi 代码库中的任何现有处理器以查看示例。


推荐阅读