apache-kafka - 无法使用 kafka-avro-console-consumer 读取 avro 消息。SerializationException:未知的魔术字节
问题描述
我正在编写一个 REST 代理,例如 confluent 休息代理。它采用 JSON 有效负载、模式主题和 id,然后将 JSON 有效负载作为 Avro 对象写入流中。当我使用 kafka-avro-console-consumer 阅读消息时,我收到“未知魔术字节”错误。
这是我的卡夫卡生产者配置:
properties.put("client.id", LocalHostUtils.getLocalHostName(null));
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
properties.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);
properties.put("schema.registry.url", configValuesManager.getString("dsp_kafka.schema_registry"));
if (KafkaUtils.isKafkaEnabled()) {
this.kafkaProducer = new KafkaProducer<String, Object>(properties);
}
这就是 REST 控制器如何将传入的 JSON 转换为 Avro
Schema schema = null;
try {
schema = schemaRegistryClient.getBySubjectAndID(schemaSubject, schemaId);
} catch (RestClientException e) {
throw new IOExceptionWithCause(e);
}
log.debug(postContent);
log.info("Subject/Version {}/{} -> {}", schemaSubject, schemaId, schema);
Object data = toAvro(schema, postContent);
这是该toAvro
方法的实现:
Object toAvro(Schema schema, String jsonBody) throws IOException
{
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object object = reader.read(
null, decoderFactory.jsonDecoder(schema, jsonBody));
return object;
}
然后将该对象传递给我使用上面给出的属性配置的 schemaValidatingProducer....
this.kafkaSchemaValidatingProducer.publish(topic, 0, UUID.randomUUID().toString(), data);
这是上的publish
方法kafkaSchemaValidatingProducer
public void publish(String topic, Integer partition, String key, Object data)
{
log.debug("publish topic={} key={} value={}", topic, key, data);
if (!KafkaUtils.isKafkaEnabled()) {
log.warn("Kafka is not enabled....");
return;
}
ProducerRecord<String, Object> record = new ProducerRecord<String, Object>(topic, key, data);
Future<RecordMetadata> metadataFuture = kafkaProducer.send(record, new Callback()
{
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
{
if (exception == null) {
log.info(metadata.toString());
return;
}
log.error("exception", exception);
}
});
kafkaProducer.flush();
}
这就是我阅读主题的方式
./bin/kafka-avro-console-consumer --bootstrap-server kafka-broker1:9021 --consumer.config client-ssl.properties --topic schema-validated-topic --property print.key=true --property print.value=true --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer --offset earliest --skip-message-on-error --partition 0 --property schema.registry.url http://schema-regisry
这导致......
[2019-08-26 16:30:36,351] ERROR Error processing message, skipping this message: (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
知道为什么我会收到“错误的幻数错误”吗?
解决方案
我解决了这个问题。那是我没有在我的命令中指定密钥反序列化器。
这是有效的命令。
./bin/kafka-avro-console-consumer \
--bootstrap-server <bootstrap-server> \
--consumer.config client-ssl.properties \
--property schema.registry.url=<schema-registry-url> \
--topic <name-of-topic> \
--property print.key=true \
--property print.value=true \
--value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer \
--key-deserializer org.apache.kafka.common.serialization.StringDeserializer
推荐阅读
- api - 是否有我可以调用的 Superset API 来管理(添加/修改/删除)Superset 中的用户、角色权限?
- excel-formula - 请修复这个excel公式
- javascript - 如何将 javascript 模板文字输出到控制台
- python - 为什么我在 PostgreSQL 中得到不同的时区
- php - 使用 $citizenid 时无法从 mysql 获取数据
- conv-neural-network - 用于 U-Net 的 ZCA 白化/球形图像作为预处理步骤
- c# - 根据测试中的类平均值从数组 C# 输出平均值
- finagle - 为什么我的锁在 finagle (mysql) 中不起作用?
- docker - 当容器连接到多个网络时,如何仅将 Docker 容器端口公开给一个特定的 Docker 网络?
- javascript - 如何从 NodeJS 中的 Azure 函数读取文件?