首页 > 解决方案 > kafka connect hdfs sink连接器失败

问题描述

我正在尝试使用 Kafka 连接接收器将文件从 Kafka 写入 HDFS。

我的属性看起来像:

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
flush.size=3
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
schema.compatability=BACKWARD
key.converter.schemas.enabled=false
value.converter.schemas.enabled=false
schemas.enable=false

当我尝试运行连接器时,出现以下异常:

org.apache.kafka.connect.errors.DataException:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。

我正在使用 Confluent 4.0.0 版。

请问有什么建议吗?

标签: apache-kafkahdfsapache-kafka-connectconfluent-platform

解决方案


我对这个问题的理解是,如果你设置了 schemas.enable=true,你就是告诉 kafka 你想将 schema 包含到 kafka 必须传输的消息中。在这种情况下,kafka 消息没有纯 json 格式。相反,它首先描述模式,然后附加与模式相对应的有效负载(即实际数据)(阅读有关 AVRO 格式的信息)。这会导致冲突:一方面您为数据指定了 JsonConverter,另一方面您要求 kafka 将架构包含到消息中。要解决此问题,您可以使用带有 schemas.enable = true 的 AvroConverter 或带有 schemas.enable = false 的 JsonCONverter。


推荐阅读