首页 > 解决方案 > Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中

问题描述

我需要从 Kafka 主题读取 JSON 序列化消息,将它们转换为 Parquet 并保留在 S3 中。

背景

官方S3-Sink-Connector支持 Parquet 输出格式,但是:

对于此连接器,您必须将 AvroConverter、ProtobufConverter 或 JsonSchemaConverter 与 ParquetFormat 一起使用。尝试使用 JsonConverter(带或不带模式)会导致 NullPointerException 和 StackOverflowException。

如果消息不是使用 JSON Schema serialization 写入的,则 JsonSchemaConverter 会抛出错误

问题陈述

因此,我正在寻找一种方法来从最初以 JSON 格式编写的 Kafka 主题读取消息,以某种方式将它们转换为 JSON Schema 格式,然后将它们插入将以 Parquet 格式写入 S3 的 S3 连接器。

或者,鉴于主要要求(获取 Kafka 消息,将其作为 Parquet 文件放入 S3),我也愿意接受替代解决方案(-不涉及编写 JAVA 代码)。谢谢!

PS:不幸的是,改变这些 Kafka 消息最初的编写方式(例如使用带有Schema Discovery的JSON Schema 序列化)目前不是我的选择。

标签: jsonapache-kafkaparquetapache-kafka-connects3-kafka-connector

解决方案


一般来说,您的数据需要有一个模式,因为 Parquet 需要它(S3 parquet 编写器转换为 Avro 作为中间步骤)

您可以考虑使用这个接受模式的 Connect 转换,并尝试应用 JSON 模式-请参阅测试。由于这会返回一个Struct对象,那么您可以尝试将其JsonSchemaConverter用作接收器的一部分。

但是,如果您只是将随机 JSON 数据放入一个没有任何一致字段或值的主题中,那么您将很难应用任何模式


推荐阅读