apache-kafka - 无法通过 kafka-avro-console-consumer 读取 avro 消息(最终目标通过 spark 流读取它)
问题描述
(最终目标)在尝试我是否最终可以从 Confluent 平台中读取 avro 数据、usng spark 流之前,如下所述:Integrating Spark Structured Streaming with the Confluent Schema Registry
我想验证我是否可以使用以下命令来阅读它们:
$ kafka-avro-console-consumer \
> --topic my-topic-produced-using-file-pulse-xml \
> --from-beginning \
> --bootstrap-server localhost:9092 \
> --property schema.registry.url=http://localhost:8081
我收到此错误消息,未知魔术字节
Processed a total of 1 messages
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
请注意,该消息可以这样读取(使用控制台消费者而不是 avro-console-消费者):
kafka-console-consumer \
--bootstrap-server localhost:9092 --group my-group-console \
--from-beginning \
--topic my-topic-produced-using-file-pulse-xml
该消息是使用 confluent connect file-pulse (1.5.2) 读取 xml 文件 (streamthoughts/kafka-connect-file-pulse) 生成的
请在这里帮忙:
我用kafka-avro-console-consumer
错了吗?我尝试了此处描述的“解串器”属性选项:https ://stackoverflow.com/a/57703102/4582240 ,没有帮助
我还不想勇敢地启动火花流来读取数据。
我使用的 file-pulse 1.5.2 属性如下所示,于 2020 年 11 月 9 日添加完成。
name=connect-file-pulse-xml
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic= my-topic-produced-using-file-pulse-xml
tasks.max=1
# File types
fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\\.xml$
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader
force.array.on.fields=sometagNameInXml
# File scanning
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/tmp/kafka-connect/xml/
fs.scan.interval.ms=10000
# Internal Reporting
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-xml
internal.kafka.reporter.topic=connect-file-pulse-status
# Track file by name
offset.strategy=name
解决方案
如果您从消费者那里获得 Unknown Magic Byte,那么生产者没有使用 Confluent AvroSerializer,并且可能推送了不使用 Schema Registry 的 Avro 数据。
如果没有看到生产者代码或消费和检查二进制格式的数据,很难知道是哪种情况。
该消息是使用 confluent connect file-pulse 生成的
您是否使用value.converter
了 AvroConverter 类?
推荐阅读
- kubernetes - 如何将所有 kubernetes 入口 yamls 转换为使用 API 版本networking.k8s.io/v1
- iis - 在 IIS 上发布时无法加载 Blazor webassembly 应用程序
- excel - 在名称管理器中查找和替换名称
- javascript - Docusign Templates Api Parse Error Unexpected Token In Response
- python - Python 类扩展协议并实现类
- docker - docker克隆一个现有的elasticsearch容器
- react-native - React Native,在使用 Axios 发布 FormData 时出现网络错误
- amazon-web-services - 在 ec2 实例中启动 Docker 失败?
- c - 在 C 中通过引用传递的强制转换函数参数
- java - Postgres smallint[] 转换成 java short[] 数组