首页 > 解决方案 > Nifi,Json 数据

问题描述

我有 JSON 输入数据作为

{
  "type": "insert",
  "timestamp": 1536959101000,
  "binlog_filename": "mysql-bin-changelog.234234",
  "binlog_position": 223,
  "database": "test",
  "table_name": "demo",
  "table_id": 138,
  "columns": [
    {
      "id": 1,
      "name": "id",
      "column_type": 12,
      "value": "IboECKV "
    },
    {
      "id": 2,
      "name": "col2",
      "column_type": 93,
      "value": "Fri Sep 14 21:05:02 UTC 2018"
    },
    {
      "id": 3,
      "name": "col3",
      "column_type": 4,
      "value": 10
    },
    {
      "id": 4,
      "name": "col4",
      "column_type": 4,
      "value": 0
    }
  ]
}

如果 column_type =93(日期时间):将值转换为:yyyy-MM-dd HH:mm:ss.SSSZ

所以输出的目标是

[
  {
    "id": "IboECKV "
  },
  {
    "col2": "2018-09-14 21:05:02.000Z"
  },
  {
    "col3": 10
  },
  {
    "col4": 0
  }
]

你知道如何解决这个案子吗?

非常感谢,

标签: arraysjsonapache-nifijolt

解决方案


您可以使用ExecuteScript和利用Groovy来解析 Json 输入和日期并将其格式化为您想要使用的任何格式SimpleDateFormat

一个简单的例子:

import java.text.SimpleDateFormat 
import java.util.Date
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if(!flowFile)return
def text = ''

session.read(flowFile, {inputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)

def jsonSlurper = new JsonSlurper()
def object = jsonSlurper.parseText(text)

def columnsSize = object.columns.size

0.upto(columnsSize - 1) {
    if (object.columns[it].column_type == 93 ) {
        oldDate = object.columns[it].value
        sdfmt2= new SimpleDateFormat('dd-M-yyyy')
        parsedDate = sdfmt2.parse(oldDate)
        object.columns[it].value = parsedDate
        output = JsonOutput.toJson(object)

        flowFile = session.write(flowFile, {outputStream ->
            outputStream.write(output.getBytes(StandardCharsets.UTF_8))
        } as OutputStreamCallback)
        session.transfer(flowFile, REL_SUCCESS)
    }
}

推荐阅读