首页 > 解决方案 > ElasticsearchSinkConnector 无法将数据反序列化到 Avro

问题描述

我创建了最简单的 kafka sink 连接器配置,我使用的是 confluent 4.1.0:

{
  "connector.class": 
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "type.name": "test-type",
  "tasks.max": "1",
  "topics": "dialogs",
  "name": "elasticsearch-sink",
  "key.ignore": "true",
  "connection.url": "http://localhost:9200",
  "schema.ignore": "true"
}

在主题中,我将消息保存为JSON

{ "topics": "resd"}

但结果我得到一个错误:

起因:org.apache.kafka.common.errors.SerializationException:反序列化 id -1 的 Avro 消息时出错 起因:org.apache.kafka.common.errors.SerializationException:未知魔术字节!

标签: apache-kafkaapache-kafka-connectconfluent-platform

解决方案


正如 cricket_007 所说,您需要告诉 Connect 使用 Json 反序列化器,如果这是您的数据格式。将其添加到您的连接器配置中:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"

推荐阅读