首页 > 解决方案 > 在 Apache NiFi 中使用 AvroSchemaRegistry

问题描述

我有 5 种不同的CSVReader控制器服务。它们的配置是相同的,除了schema text(因为不同的标题)和 1 CSVRecordSetWriter

我想只留下一个CSVReaderschema text动态设置。我读过AvroSchemaRegistry但我不清楚如何使用它。

我应该创建 5 个AvroSchemaRegistry具有 2 个属性的不同控制器:namevalue?Fe 我想提出以下架构:

{
    "type": "record",
    "name": "campaigns",
    "namespace": "common",
    "fields": [
        {"name": "campaign_name", "type": "string"},
        {"name": "campaign_id", "type": "long"},
        {"name": "date", "type" : {"type": "int", "logicalType" : "date"}}
    ]
}

我应该创建AvroSchemaRegistry

对于另一个模式,我应该创建另一个AvroSchemaResgitry具有另一个属性的控制器name并且value

之后,如何配置CSVReaderCSVRecordSetWriter使用这些模式?最后,我应该如何处理流文件?添加其他属性?什么样的?

标签: apache-nifi

解决方案


像这样配置您的流程(根据您的要求进行更改),

流动

  1. UpdateAttribute配置以派生/硬编码流文件特定模式-

更新属性

  1. ValidateRecord配置以使用通用 csv 阅读器并动态传递模式 -

验证记录

  1. CSVReader控制器服务使用动态传递的模式并设置模式访问策略 -

CSVReader

如果您希望使用 NiFi 支持的模式注册表,则将所有模式放在注册表中,schema.nameaccess strategy为记录读取器/写入器设置属性以从注册表访问模式,但首先您需要在控制器服务中添加/配置模式注册表提供程序。

更新:

示例 SchemaRegistry 控制器服务配置HortonworksSchemaRegistry(ConfluentSchemaRegistry 和 AvroSchemaRegistry 具有或多或少相同的属性)。重要的是执行查找操作时注册表 API 期望的参数,因此我们只需要从 Reader/Writer 控制器服务Schema Name属性传递相同的值,请参考this了解一下。

架构注册表 URL:http ://example.com:7788/api/v1

HortonworksSchemaRegistry


推荐阅读