python - 如何将kafka消息中的键序列化为字符串
问题描述
我正在使用 confluent-kafka,我需要将我的键序列化为字符串并产生一些消息。我有一个工作代码,用于从模式注册表中检索模式并使用它来生成消息。问题是当我尝试从本地文件读取架构时它失败了。
下面的代码是模式注册表的工作代码:
import argparse
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka import SerializingProducer
import avro.schema
SCHEMA_HOST = '192.168.40.10'
TOPIC = 'my_topic'
SCHEMA = 'path/to/schema.avsc'
# Just parse argumments
parser = argparse.ArgumentParser(description="Avro Kafka Generator")
parser.add_argument('--schema_registry_host', default=SCHEMA_HOST, help="schema registry host")
parser.add_argument('--schema', type=str, default=SCHEMA, help="schema to produce under")
parser.add_argument('--topic', type=str, default=TOPIC, help="topic to publish to")
parser.add_argument('--frequency', type=float, default=1.0, help="number of message per second")
args = parser.parse_args()
# Actual code
schema_registry_conf = {'url': "http://{}:8081".format(SCHEMA_HOST)}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
schema = schema_registry_client.get_latest_version(subject_name=TOPIC + "-value")
# schema = schema_registry_client.get_schema(schema.schema_id)
schema = schema_registry_client.get_schema(schema.schema_id)
schema_str = schema.schema_str
pro_conf = {"auto.register.schemas": False}
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)
conf = {'bootstrap.servers': "{}:9095".format(args.schema_registry_host),
'schema.registry.url': "http://{}:8081".format(args.schema_registry_host)}
# avro_producer = AvroProducer(conf, default_value_schema=value_schema)
producer_conf = {'bootstrap.servers': "{}:9095".format(SCHEMA_HOST),
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer}
avro_producer = SerializingProducer(producer_conf)
但是,当我尝试对本地文件使用变体时,它会失败:
# Read schema from local file
value_schema = avro.schema.Parse(open(args.schema, "r").read())
schema_str = open(args.schema, "r").read().replace(' ', '').replace('\n', '')
pro_conf = {"auto.register.schemas": True}
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)
这部分对两个版本都是通用的:
producer_conf = {'bootstrap.servers': "{}:9095".format(SCHEMA_HOST),
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer}
avro_producer = SerializingProducer(producer_conf)
avro_producer.produce(topic=args.topic, value=message)
我得到的错误如下;
KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="'RecordSchema' 对象没有属性 'lookup_schema'"}
显然,这不是最好的方法,我猜如果它有效,代码看起来很丑陋且容易出错。但它甚至不起作用,我需要一些帮助来了解如何读取本地模式并在AvroSerializer
之后使用它。
解决方案
推荐阅读
- elasticsearch - DSL查询从字典数组中查找元素
- javascript - 单击btn jquery时删除表tr
- sql - 选择符合条件 SQL 的所有非不同用户的列表
- angular - 带导航的 Angular FullCalendar 自定义标题按钮
- google-visualization - 在谷歌图表中结合风格、间隔和确定性
- php - 使用 PHP 获取 og:image url
- html - 阴影背景大小的问题
- database - 为什么我在 XAMP 上托管的网站在本地主机上没有数据库错误?
- blazor - 在 blazor 页面上设置参数时 IIS 和独立应用程序崩溃
- spring - 尝试将 Spring 版本从 5.1 升级到 5.2 时出错