python - 使用python将十进制数据写入avro的问题
问题描述
我正在尝试使用 python 将一些十进制值写为 avro。该代码在没有十进制值的情况下工作正常。如果我添加十进制值,我会得到 AvroTypeException:数据 {blah} 不是模式 {blah..blah} 的示例。这是我的python代码
#trial with avro library
import avro.schema
import avro.io
import io
from decimal import *
from decimal import Decimal as D
schema = """{"name":"DEPARTMENT_111","type":"record","fields":[{"name":"DEPARTMENT_NAME","type":["null","string"],"default":null},{"name":"DEPARTMENT_ID","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":10}]},{"name":"ETL_BATCH_SK","type":["null","long"],"default":null},{"name":"INSERT_TS","type":["null","string"],"default":null},{"name":"OP_CODE","type":["null","string"],"default":null},{"name":"PROCESSED_FLAG","type":["null","string"],"default":null}]}"""
print(format_json(json.loads(schema)))
parsed_schema = avro.schema.Parse(schema)
writer = avro.io.DatumWriter(parsed_schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, D):
return float(obj)
return json.JSONEncoder.default(self, obj)
sample_department_data = {
"DEPARTMENT_NAME":"Physics",
"DEPARTMENT_ID" : decimal.Decimal("201.0"),
"ETL_BATCH_SK" : 952879684,
"OP_CODE":"I",
"PROCESSED_FLAG":"False"
}
sample_department_json = json.dumps(sample_department_data, cls=DecimalEncoder)
writer.write(sample_department_json, encoder)
但是,我收到以下错误。
---------------------------------------------------------------------------
AvroTypeException Traceback (most recent call last)
<ipython-input-42-d78ba6b385e2> in <module>()
47 }
48 sample_department_json = json.dumps(sample_department_data, cls=DecimalEncoder)
---> 49 writer.write(sample_department_json, encoder)
50
51 raw_bytes = bytes_writer.getvalue()
~/.pyenv/versions/3.6.0/lib/python3.6/site-packages/avro/io.py in write(self, datum, encoder)
815 # validate datum
816 if not Validate(self.writer_schema, datum):
--> 817 raise AvroTypeException(self.writer_schema, datum)
818
819 self.write_data(self.writer_schema, datum, encoder)
AvroTypeException: The datum {"DEPARTMENT_NAME": "Physics", "DEPARTMENT_ID": 201.0, "ETL_BATCH_SK": 952879684, "OP_CODE": "I", "PROCESSED_FLAG": "False"} is not an example of the schema {
"type": "record",
"name": "DEPARTMENT_111",
"fields": [
{
"type": [
"null",
"string"
],
"name": "DEPARTMENT_NAME",
"default": null
},
{
"type": [
"null",
{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 10
}
],
"name": "DEPARTMENT_ID"
},
{
"type": [
"null",
"long"
],
"name": "ETL_BATCH_SK",
"default": null
},
{
"type": [
"null",
"string"
],
"name": "INSERT_TS",
"default": null
},
{
"type": [
"null",
"string"
],
"name": "OP_CODE",
"default": null
},
{
"type": [
"null",
"string"
],
"name": "PROCESSED_FLAG",
"default": null
}
]
}
我究竟做错了什么?
解决方案
我试图看看可能出了什么问题,但我没有看到任何问题。事实上,如果我使用fastavro
它就可以正常工作,如下所示:
from decimal import Decimal
import io
import json
import fastavro
schema = """{"name":"DEPARTMENT_111","type":"record","fields":[{"name":"DEPARTMENT_NAME","type":["null","string"],"default":null},{"name":"DEPARTMENT_ID","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":10}]},{"name":"ETL_BATCH_SK","type":["null","long"],"default":null},{"name":"INSERT_TS","type":["null","string"],"default":null},{"name":"OP_CODE","type":["null","string"],"default":null},{"name":"PROCESSED_FLAG","type":["null","string"],"default":null}]}"""
parsed_schema = fastavro.parse_schema(json.loads(schema))
sample_department_data = {
"DEPARTMENT_NAME":"Physics",
"DEPARTMENT_ID" : Decimal("201.0"),
"ETL_BATCH_SK" : 952879684,
"OP_CODE":"I",
"PROCESSED_FLAG":"False"
}
bio = io.BytesIO()
fastavro.writer(bio, parsed_schema, [sample_department_data])
bio.seek(0)
print(list(fastavro.reader(bio)))
推荐阅读
- python - 将两个列表与内部嵌套列表相交
- wpf - 键盘快捷键不移动 .NET Core 3.0 项目中的 WPF 控件
- angular - NgRX 8 效果 - createEffect() 不工作 - 类型'Observable
' 不可分配给类型 'Observable - powershell - Send-MailMessage,当 LastLogonDate = never 时状态从不?
- javascript - 修复 Typescript 中的类型
- django - AttributeError:“产品”对象没有属性“过滤器”
- r - 如何使用 google sheet api 打开基本过滤器并仍然看到所有值?
- python - 如何避免 pylint 在非 python 文件上运行
- sql - 创建维护计划以更新用户帐户状态
- ios - UISearchBar 有时会在点击视图时退出