apache-kafka - 如何在 kafka 控制台生产者中添加键序列化器和值序列化器
问题描述
我在 spring boot kafka producer application.yaml 中设置了以下属性
消费者属性:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
生产者属性:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
我必须从 kafka 控制台生产者那里生成消息,例如-
kafka-console-producer --bootstrap-server confluent-cp-kafka:9092 --topic TSTTOPIC --producer-property key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
但它不起作用,当我从控制台生产者生成消息时,我在消费者日志中收到错误,如下所示
解决方案
您不能在 CLI 上使用冒号。
如果你想使用你的属性文件,然后--producer.config
通过producer.properties
文件
否则,您可以kafka-avro-console-producer
与--producer-property key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
至于 Avro 序列化程序,您似乎缺少任何key.schema
or value.schema
+ schema.registry.url
,它们只是由 the 读取的属性,kakfa-avro-console-producer
并且可以解释为什么您的 Avro 消费者将无法读取数据(它以明文形式发送)
推荐阅读
- c++ - 重载运算符的范围是什么?它会影响作为类成员的集合的插入函数吗?
- youtube-api - 经过审核,我们仍然收到“暂时禁用此应用的 google 登录”
- python - 使用 matplotlib.animation.FuncAnimation 对分类数据进行动画处理
- flutter - 颤振不返回值
- npm - private nexus npm proxy registry and tarball location
- python - how to calculate surface area of a pyramid in python?
- android - Error inflating class Fragment, when adding navGraph attribute
- json - Mule 4 - Is there a recommended method of chunking a huge payload of say 3000 records in a JSON array into groups of 100 records
- android - How display thumbnail image in Exoplayer video?
- python - 有没有办法在有或没有 api 的情况下检查流媒体是否在 twitch 上直播?