python-2.7 - NiFi 上的 Python 2.X:json.loads 中的Ñ(和其他)问题
问题描述
我正在使用 Jython InvokeScriptedProcessor 将数据从 json 结构构造到 sql 结构。我在使用特定功能时遇到问题。json.loads。json.loads 无法识别特殊字符,如 ñ、é、á、í...
它以一种奇怪的形式写出来。而且我还没有达到任何形式来拥有它。
例如(很简单)
{"id":"ÑUECO","value":3.141592,"datetime":"....","location":"ÑUECO"}
如果我们尝试用 sql 编写它
INSERT INTO .... (id, value) VALUES ("...",3.141592);
它会失败。它让我失望。我无法使用任何返回选项返回数据,成功或失败,NiFi 的版本无关紧要。这是我的代码
def process(self, inputStream, outputStream):
# read input json data from flowfile content
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
data = json.loads(text)
两者都不
data = json.loads(text.encode("utf-8"))
工作正常。文本以 unicode 形式出现。
def __generate_sql_transaction(input_data):
""" Generate SQL statement """
sql = """
BEGIN;"""
_id = input_data.get("id")
_timestamp = input_data.get("timestamp")
_flowfile_metrics = input_data.get("metrics")
_flowfile_metadata = input_data.get("metadata")
self.valid = __validate_metrics_type(_flowfile_metrics)
if self.valid is True:
self.log.error("generate insert")
sql += """
INSERT INTO
{0}.{1} (id, timestamp, metrics""".format(schema, table)
if _flowfile_metadata:
sql += ", metadata"
sql += """)
VALUES
('{0}', '{1}', '{2}'""".format(_id.encode("utf-8"), _timestamp, json.dumps(_flowfile_metrics))
self.log.error("generate metadata")
if _flowfile_metadata:
sql += ", '{}'".format(json.dumps(_flowfile_metadata).encode("utf-8"))
sql += """)
ON CONFLICT ({})""".format(on_conflict)
if not bool(int(self.update)):
sql += """
DO NOTHING;"""
else:
sql += """
DO UPDATE
SET"""
if bool(int(self.preference)):
sql += """
metrics = '{2}' || {0}.{1}.metrics;""".format(schema, table, json.dumps(_flowfile_metrics))
else:
sql += """
metrics = {0}.{1}.metrics || '{2}';""".format(schema, table, json.dumps(_flowfile_metrics))
else:
return ""
sql += """
COMMIT;"""
return sql
我再次将数据发送到 NiFi:
output = __generate_sql_transaction(data)
self.log.error("post generate_sql_transaction")
self.log.error(output.encode("utf-8"))
# If no sql_transaction is generated because requisites weren't met,
# set the processor output with the original flowfile input.
if output == "":
output = text
# write new content to flowfile
outputStream.write(
output.encode("utf-8")
)
那个输出看起来像
INSERT INTO .... VALUES ("ÃUECO","2020-01-01T10:00:00",'{"value":3.1415}','{"location":"\u00d1UECO"}');
我在元数据中也有“Ñueco”,它不适用于 id 或元数据
注意:似乎 InvokeScriptedProcessor 使用 Groove 而不是 Python 可以正常工作。但我的问题是我对 Groovy 一无所知...
有没有人发现类似的问题?你是怎么解决的?
更新:
输入示例:
{"id":"ÑUECO",
"metrics":{
"value":3.1415
},
"metadata":{
"location":"ÑUECO"
},
"timestamp":"2020-01-01 00:00:00+01:00"
}
期望的输出:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÑUECO',
'2020-01-01T00:00:00+01:00',
'{"value":3.1415}',
'{"location":"ÑUECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;
实际输出:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÃUECO',
'2020-01-01T00:00:00+01:00',
'{"value":3.1415}',
'{"location":"\u00d1UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;
解决方案
UPD
jython 不能与字节字符串一起正常工作 - 所以,不要使用
.encode('utf-8')
使用java方法将内容写回具有特定编码的流文件
下面是一个正确读取和写入非 ascii 字符的示例,包括Ñ
将ExecuteScript
处理器与 jython 一起使用并替换_transform(text)
函数体:
import traceback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class FlowWriter(StreamCallback):
def _transform(self, text):
# transform incoming text here
return '@@@@' + text + '****'
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
new_text = self._transform(text)
IOUtils.write(new_text, outputStream, StandardCharsets.UTF_8)
flowFile = session.get()
if flowFile != None:
try:
flowFile = session.write(flowFile, FlowWriter())
flowFile = session.putAttribute(flowFile, "filename", 'headerfile.xml')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
except Exception as e:
log.error("{}\n{}".format(e,traceback.format_exc()))
session.rollback(True) # put file back and penalize it
推荐阅读
- python - SyntaxError:python 脚本上的语法无效
- python - 如何从 Python re.search 中删除返回的对象 <_sre.SRE_Match 对象?
- sql - PostgreSQL - 如何创建对两个属性求和的派生属性?
- python - 如何将值附加到 Keras 深度网络中的张量
- opengl - OpenGL/GLSL 多纹理绑定不起作用
- sql - 使用 SQL QUERY 忽略表中的重复记录(不删除)
- yarnpkg - yarn workspace 如何解析多个版本的 cli 包
- python - 从python中的列表中添加2个整数
- solr - Solr 区分大小写搜索
- node.js - 如何为 libvirt api 编写 nodejs 包装脚本