json - 将来自 NiFi 流的传入 Json 消息解析为 Hbase 表
问题描述
大家好,我从 NiFi 中的 Kafka 主题中收到一条消息流,我正在通过消费者进程读取该消息。消息格式为 json(虚拟 json 值,json 格式与原始格式相同):
{ "schema": {
"type": "struct",
"name": "emp_table",
"fields": [
{
"field": "emp_id",
"type": "string"
},
{
"field": "emp_name",
"type": "String"
},
{
"field": "city",
"type": "string"
},
{
"field": "emp_sal",
"type": "string"
},
{
"field": "manager_name",
"type": "string"
}
] }, "payload": {
"emp_id": "1",
"emp_name": "abc",
"city": "NYK",
"emp_sal": "100000",
"manager_name": "xyz" } }
正如您在此处看到的,实际表名位于 Schema 下,列值位于有效负载下。我可以通过在 NiFi 中使用 EvaluateJsonPath 和 PutHBaseJson 处理器来解析列值并将其放入 Hbase 表中。
我能够实现的是手动输入表名和rowid。但我的问题是我想从 json 中获取 tablename(在上面的示例 emp_table 中)和 rowid(在上面的示例 emp_id 中),并在运行时将这些值与列值一起提供给 NiFi 中的 PutHbaseJson 处理器。
解决方案
您应该能够向 EvaluateJsonPath 添加另一个 JSON 路径表达式,例如:
table = $.schema.name
然后在 PutHBaseJson 中将表命名为 ${table},或者在 EvaluateJsonPath 中命名它。
推荐阅读
- java - 如何使用 Maven 安装 unirest?
- php - 如何将消息从 PHP 页面返回到 AJAX 调用
- java - 如何获取相同视图组件的列表
- python - 错误计算区域“IndexError:维度 1 的张量的索引过多”
- dockerfile - docker hyperledger indy 上的 node-gyp 重建错误
- containers - Podman oci .containerenv:不是目录
- python - 设置 DeepDiff 以很好地识别删除
- sql - 如何通过算法从帐户树中选择适当的帐户
- github - 仅在 GitHub Actions 中发布更改的项目
- r - 在 R 包中使用 %>%