首页 > 解决方案 > 如何将kafka消息中的键序列化为字符串

问题描述

我正在使用 confluent-kafka,我需要将我的键序列化为字符串并产生一些消息。我有一个工作代码,用于从模式注册表中检索模式并使用它来生成消息。问题是当我尝试从本地文件读取架构时它失败了。

下面的代码是模式注册表的工作代码:

import argparse
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka import SerializingProducer
import avro.schema

SCHEMA_HOST = '192.168.40.10'
TOPIC = 'my_topic'
SCHEMA = 'path/to/schema.avsc'

# Just parse argumments
parser = argparse.ArgumentParser(description="Avro Kafka Generator")
parser.add_argument('--schema_registry_host', default=SCHEMA_HOST, help="schema registry host")
parser.add_argument('--schema', type=str, default=SCHEMA, help="schema to produce under")
parser.add_argument('--topic', type=str, default=TOPIC, help="topic to publish to")
parser.add_argument('--frequency', type=float, default=1.0, help="number of message per second")

args = parser.parse_args()

# Actual code
schema_registry_conf = {'url': "http://{}:8081".format(SCHEMA_HOST)}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
schema = schema_registry_client.get_latest_version(subject_name=TOPIC + "-value")
# schema = schema_registry_client.get_schema(schema.schema_id)
schema = schema_registry_client.get_schema(schema.schema_id)
schema_str = schema.schema_str
pro_conf = {"auto.register.schemas": False}
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)
conf = {'bootstrap.servers': "{}:9095".format(args.schema_registry_host),
        'schema.registry.url': "http://{}:8081".format(args.schema_registry_host)}
# avro_producer = AvroProducer(conf, default_value_schema=value_schema)
producer_conf = {'bootstrap.servers': "{}:9095".format(SCHEMA_HOST),
                 'key.serializer': StringSerializer('utf_8'),
                 'value.serializer': avro_serializer}
avro_producer = SerializingProducer(producer_conf)

但是,当我尝试对本地文件使用变体时,它会失败:

# Read schema from local file
value_schema = avro.schema.Parse(open(args.schema, "r").read())
schema_str = open(args.schema, "r").read().replace(' ', '').replace('\n', '')

pro_conf = {"auto.register.schemas": True}
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)

这部分对两个版本都是通用的:

producer_conf = {'bootstrap.servers': "{}:9095".format(SCHEMA_HOST),
                 'key.serializer': StringSerializer('utf_8'),
                 'value.serializer': avro_serializer}
avro_producer = SerializingProducer(producer_conf)
avro_producer.produce(topic=args.topic, value=message)

我得到的错误如下;

KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="'RecordSchema' 对象没有属性 'lookup_schema'"}

显然,这不是最好的方法,我猜如果它有效,代码看起来很丑陋且容易出错。但它甚至不起作用,我需要一些帮助来了解如何读取本地模式并在AvroSerializer之后使用它。

标签: pythonapache-kafkaconfluent-platform

解决方案


推荐阅读