python - Confluent Python Avro Producer:数据 {'..'} 不是模式的示例
问题描述
我无法为特定模式生成数据,我无法理解为什么。代码中包含为字典的示例数据是直接使用融合的“avro-random-generator”创建的,因此示例数据必须是正确的,因为它是直接从模式派生的。Schema Registry 和 Avro Random Generator 都是 Confluent 工具,因此他们的工具不可能生成不适用于他们的模式注册表的示例数据。
这是架构:
{
"type": "record",
"name": "schemaV1",
"namespace": "net.avro.schemaV1",
"doc": "",
"fields": [
{
"name": "orderId",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "offerId",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "redeemerId",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
},
{
"name": "eventCancellationType",
"type": "int",
"doc": ""
},
{
"name": "ruleIds",
"type": {
"type": "array",
"items": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
}
},
{
"name": "eventOriginator",
"type": {
"type": "record",
"name": "AvroEventPartnerV1",
"doc": "",
"fields": [
{
"name": "partnerShortName",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "businessUnitShortName",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
},
{
"name": "branchShortName",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
}
]
}
},
{
"name": "roundedDelta",
"doc": "",
"type": {
"type": "record",
"name": "AvroAmountV1",
"doc": "Amount with a currency",
"fields": [
{
"name": "amount",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 21,
"scale": 3
},
"doc": "The amount as a decimal number"
},
{
"name": "currency",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
}
]
}
},
{
"name": "rewardableLegalDelta",
"type": [
"null",
"AvroAmountV1"
],
"doc": "",
"default": null
},
{
"name": "receiptNumber",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "referenceReceiptNumber",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
},
{
"name": "eventEffectiveTime",
"type": {
"type": "long"
},
"doc": ""
}
]
}
这是我的脚本:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, ClientError, ValueSerializerError
BOOTSTRAP_SERVER = 'localhost:9092'
SCHEMA_REGISTRY = 'http://localhost:8081'
TOPIC = 'topicV1'
SCHEMA_PATH = 'schemas/schemaV1.avsc'
def schemaReader(SCHEMA_PATH):
with open(SCHEMA_PATH, 'r') as file:
data = file.read()
return data
def main():
kafka_config = {
'bootstrap.servers': BOOTSTRAP_SERVER,
'schema.registry.url': SCHEMA_REGISTRY
}
value_schema = avro.loads( schemaReader(SCHEMA_PATH) )
null = None
value = {
"orderId": "a9bcc55f-e2c0-43d6-b793-ff5f295d051d",
"offerId": "119475017578242889",
"redeemerId": "1176a01b-b2dc-45a9-91cc-232361e14f99",
"eventCancellationType": 0,
"ruleIds": ["ID-IPM00001"],
"eventOriginator": {"partnerShortName":
"partner","businessUnitShortName": null,"branchShortName": null},
"roundedDelta": {"amount": "\u8463","currency": "PTS"},
"rewardableLegalDelta": {"amount": "\u8463","currency": "EUR"},
"receiptNumber": "19b2ff68-ed06-48f0-9ce9-d697c0eadc19",
"referenceReceiptNumber": null,
"eventEffectiveTime": 1569494696656
}
avroProducer = AvroProducer(kafka_config, default_value_schema=value_schema )
avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
avroProducer.flush()
if __name__== "__main__":
main()
这是我收到的回溯:
File "producer.py", line 64, in <module>
main()
File "producer.py", line 60, in main
avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 115, in encode_record_with_schema
return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 149, in encode_record_with_schema_id
writer(record, outf)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 86, in <lambda>
return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
File "/apps/python/python2.7/lib/python2.7/site-packages/avro/io.py", line 1042, in write
raise AvroTypeException(self.writers_schema, datum)
avro.io.AvroTypeException: The datum {'..'} is not an example of the schema { ..}
解决方案
似乎问题在于amount
应该是字节类型,但您有一个正常的\u8463
. 您提到的用于生成随机数据的库使用 java 默认字符集创建一个字节字符串:https ://github.com/confluentinc/avro-random-generator/blob/master/src/main/java/io /confluent/avro/random/generator/Generator.java#L373
但是,也许该默认值不是 java 实现(参考实现)使用的 iso-8859-1:https ://github.com/apache/avro/blob/bf47ec97e0b7f5701042fac067b73b421a9177b7/lang/java/avro/src/主/java/org/apache/avro/io/JsonEncoder.java#L220
推荐阅读
- blockchain - 同一笔交易的不同gas量
- c# - 如何用指针作为参数在 C# 中包装 C++ DLL?
- c# - 在 IHostedService 中应该如何使用取消令牌?
- database - 多租户共享环境中表的水平分区
- postgresql - 使用新的 docker-compose 时用户的密码验证失败
- html - 如何在Angular中按顺序上传文件?
- grails - 如何使用 createCriteria 在 Grails 中执行深度查询
- angular - Angular 4将两个单独的元素一起滚动
- html - 具有所有图像尺寸的图像覆盖父级
- ionic3 - 如何使用 angularfire2 更新 firebase 对象的子对象