首页 > 解决方案 > NiFi 上的 Python 2.X:json.loads 中的Ñ(和其他)问题

问题描述

我正在使用 Jython InvokeScriptedProcessor 将数据从 json 结构构造到 sql 结构。我在使用特定功能时遇到问题。json.loads。json.loads 无法识别特殊字符,如 ñ、é、á、í...

它以一种奇怪的形式写出来。而且我还没有达到任何形式来拥有它。

例如(很简单)

{"id":"ÑUECO","value":3.141592,"datetime":"....","location":"ÑUECO"}

如果我们尝试用 sql 编写它

INSERT INTO .... (id, value) VALUES ("...",3.141592);

它会失败。它让我失望。我无法使用任何返回选项返回数据,成功或失败,NiFi 的版本无关紧要。这是我的代码

    def process(self, inputStream, outputStream):
        # read input json data from flowfile content
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        data = json.loads(text) 

两者都不

        data = json.loads(text.encode("utf-8"))

工作正常。文本以 unicode 形式出现。

                def __generate_sql_transaction(input_data):
                    """ Generate SQL statement """

                    sql = """
        BEGIN;"""

                    _id = input_data.get("id")
                    _timestamp = input_data.get("timestamp")
                    _flowfile_metrics = input_data.get("metrics")
                    _flowfile_metadata = input_data.get("metadata")

                    self.valid = __validate_metrics_type(_flowfile_metrics)

                    if self.valid is True:
                        self.log.error("generate insert")
                        sql += """
            INSERT INTO
                {0}.{1} (id, timestamp, metrics""".format(schema, table)

                        if _flowfile_metadata:
                            sql += ", metadata"
                        sql += """)
            VALUES
                ('{0}', '{1}', '{2}'""".format(_id.encode("utf-8"), _timestamp, json.dumps(_flowfile_metrics))

                        self.log.error("generate metadata")
                        if _flowfile_metadata:
                            sql += ", '{}'".format(json.dumps(_flowfile_metadata).encode("utf-8"))
                        sql += """)
            ON CONFLICT ({})""".format(on_conflict)

                        if not bool(int(self.update)):
                            sql += """
                DO NOTHING;"""
                        else:
                            sql += """
                DO UPDATE
                    SET"""
                            if bool(int(self.preference)):
                                sql += """
                        metrics = '{2}' || {0}.{1}.metrics;""".format(schema, table, json.dumps(_flowfile_metrics))
                            else:
                                sql += """
                        metrics = {0}.{1}.metrics || '{2}';""".format(schema, table, json.dumps(_flowfile_metrics))

                    else:
                        return ""

                    sql += """
        COMMIT;"""
                    return sql

我再次将数据发送到 NiFi:

        output = __generate_sql_transaction(data)
        self.log.error("post generate_sql_transaction")
        self.log.error(output.encode("utf-8"))

        # If no sql_transaction is generated because requisites weren't met,
        # set the processor output with the original flowfile input.
        if output == "":
            output = text

        # write new content to flowfile
        outputStream.write(
            output.encode("utf-8")
        )

那个输出看起来像

INSERT INTO .... VALUES ("ÃUECO","2020-01-01T10:00:00",'{"value":3.1415}','{"location":"\u00d1UECO"}');

我在元数据中也有“Ñueco”,它不适用于 id 或元数据

注意:似乎 InvokeScriptedProcessor 使用 Groove 而不是 Python 可以正常工作。但我的问题是我对 Groovy 一无所知...

有没有人发现类似的问题?你是怎么解决的?

更新:

输入示例:

{"id":"ÑUECO",
 "metrics":{
     "value":3.1415
 },
 "metadata":{
     "location":"ÑUECO"
 },
 "timestamp":"2020-01-01 00:00:00+01:00"
}

期望的输出:

BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÑUECO', 
        '2020-01-01T00:00:00+01:00',
        '{"value":3.1415}',
        '{"location":"ÑUECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
    SET
        metrics='{"value":3.1415}' || Table.metrics;
COMMIT;

实际输出:

BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÃUECO', 
        '2020-01-01T00:00:00+01:00',
        '{"value":3.1415}',
        '{"location":"\u00d1UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
    SET
        metrics='{"value":3.1415}' || Table.metrics;
COMMIT;

标签: python-2.7apache-nifijythoninvokescript

解决方案


UPD

  1. jython 不能与字节字符串一起正常工作 - 所以,不要使用.encode('utf-8')

  2. 使用java方法将内容写回具有特定编码的流文件

下面是一个正确读取和写入非 ascii 字符的示例,包括Ñ

ExecuteScript处理器与 jython 一起使用并替换_transform(text)函数体:

import traceback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class FlowWriter(StreamCallback):
    def _transform(self, text):
        # transform incoming text here
        return '@@@@' + text + '****'

    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        new_text = self._transform(text)
        IOUtils.write(new_text, outputStream, StandardCharsets.UTF_8)

flowFile = session.get()
if flowFile != None:
    try:
        flowFile = session.write(flowFile, FlowWriter())
        flowFile = session.putAttribute(flowFile, "filename", 'headerfile.xml')
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
    except Exception as e:
        log.error("{}\n{}".format(e,traceback.format_exc()))
        session.rollback(True)  # put file back and penalize it

推荐阅读