首页 > 解决方案 > 将来自 NiFi 流的传入 Json 消息解析为 Hbase 表

问题描述

大家好,我从 NiFi 中的 Kafka 主题中收到一条消息流,我正在通过消费者进程读取该消息。消息格式为 json(虚拟 json 值,json 格式与原始格式相同):

{   "schema": {
    "type": "struct",
    "name": "emp_table",
    "fields": [
      {
        "field": "emp_id",
        "type": "string"
      },
      {
        "field": "emp_name",
        "type": "String"
      },
      {
        "field": "city",
        "type": "string"
      },
      {
        "field": "emp_sal",
        "type": "string"
      },
      {
        "field": "manager_name",
        "type": "string"
      }
    ]   },   "payload": {
    "emp_id": "1",
    "emp_name": "abc",
    "city": "NYK",
    "emp_sal": "100000",
    "manager_name": "xyz"   } }

正如您在此处看到的,实际表名位于 Schema 下,列值位于有效负载下。我可以通过在 NiFi 中使用 EvaluateJsonPath 和 PutHBaseJson 处理器来解析列值并将其放入 Hbase 表中。

我能够实现的是手动输入表名和rowid。但我的问题是我想从 json 中获取 tablename(在上面的示例 emp_table 中)和 rowid(在上面的示例 emp_id 中),并在运行时将这些值与列值一起提供给 NiFi 中的 PutHbaseJson 处理器。

在此处输入图像描述

标签: jsonparsinghbaseapache-nifi

解决方案


您应该能够向 EvaluateJsonPath 添加另一个 JSON 路径表达式,例如:

table = $.schema.name

然后在 PutHBaseJson 中将表命名为 ${table},或者在 EvaluateJsonPath 中命名它。


推荐阅读