首页 > 解决方案 > 从 KSQL 流中使用 AVRO Kafka 主题时出错

问题描述

我在 KSQLDB 中创建了一些 dummydata 作为 Stream VALUE_FORMAT='JSON' TOPIC='MYTOPIC'

设置在 Docker-compose 上。我正在运行 Kafka Broker、Schema-registry、ksqldbcli、ksqldb-server、zookeeper

现在我想从主题中使用这些记录。我的第一个也是最后一个方法是通过命令行使用以下命令

docker run  --net=host  --rm  confluentinc/cp-schema-registry:5.0.0  kafka-avro-console-consumer
--bootstrap-server localhost:29092 --topic DXT --from-beginning --max-messages 10
--property print.key=true --property print.value=true
--value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer
--key-deserializer org.apache.kafka.common.serialization.StringDeserializer

但这只会返回错误

[2021-04-22 21:45:42,926] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我还在 Java Spring 中尝试了不同的用例,但没有成功。我只是无法使用创建的主题。如果我需要定义自己的模式,我应该在哪里做,最简单的方法是什么,因为我刚刚在 Ksqldb 中创建了一个流?是否有一个易于遵循的示例。当我像在Ksqldb.io上的快速入门示例中那样创建流时,我没有指定任何其他内容。(我在我的部署中添加了模式注册表)因为我是一个在这里坐了将近 10 个小时的菜鸟,任何帮助将不胜感激。

编辑:我发现纯 JSON 不需要带有 ksqldb 的 Schema-registry。在这里。但是如何反序列化呢?

标签: apache-kafkaksqldb

解决方案


If you've written JSON data to the topic then you can read it with the kafka-console-consumer.

The error you're getting (Error deserializing Avro message for id -1…Unknown magic byte!) is because you're using the kafka-avro-console-consumer which attempts to deserialise the topic data as Avro - which it isn't, hence the error.

You can also use PRINT DXT; from within ksqlDB.


推荐阅读