apache-kafka - kafka connect hdfs sink连接器失败
问题描述
我正在尝试使用 Kafka 连接接收器将文件从 Kafka 写入 HDFS。
我的属性看起来像:
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
flush.size=3
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
schema.compatability=BACKWARD
key.converter.schemas.enabled=false
value.converter.schemas.enabled=false
schemas.enable=false
当我尝试运行连接器时,出现以下异常:
org.apache.kafka.connect.errors.DataException:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。
我正在使用 Confluent 4.0.0 版。
请问有什么建议吗?
解决方案
我对这个问题的理解是,如果你设置了 schemas.enable=true,你就是告诉 kafka 你想将 schema 包含到 kafka 必须传输的消息中。在这种情况下,kafka 消息没有纯 json 格式。相反,它首先描述模式,然后附加与模式相对应的有效负载(即实际数据)(阅读有关 AVRO 格式的信息)。这会导致冲突:一方面您为数据指定了 JsonConverter,另一方面您要求 kafka 将架构包含到消息中。要解决此问题,您可以使用带有 schemas.enable = true 的 AvroConverter 或带有 schemas.enable = false 的 JsonCONverter。
推荐阅读
- objective-c - SKCloudServiceController.requestAuthorization 在授权时卡住
- google-kubernetes-engine - GKE:当 api 服务器关闭时如何处理 etcd 压缩/当 etcd 已满时会发生什么
- sql-server - 进程等待命令但睡眠
- r - 在R中将JSON转换为数据框
- visual-studio-2019 - 如何在布局中使用 Razor 部分标签助手
- c# - 在 Blazor Web 程序集中每秒调用 StateHasChanged() 是最佳的吗?
- c# - 根据 Project Likes、Project Connected 和 Project Rating 找到最佳项目
- selenium - 有没有办法将 WhatsApp API 与第三方应用程序一起使用?
- python - 如何使用 Appium 和 Python 使用 Android 键盘输入文本?
- python - 合并两个 Mongodb 集合并选择唯一值(pymongo)