首页 > 解决方案 > 在使用 scala spark 写入 Kafka 主题之前为 Dataframe 定义模式

问题描述

我有以下数据框(finalDataFrame)架构

 root
     |-- sentence: string (nullable = true)
     |-- category: string (nullable = true)
     |-- Id: string (nullable = true)

我已经定义了以下架构

def defineS3SinkSchema() : StructType = {
    new StructType()
      .add("payload", new StructType()
        .add("sentence", StringType)
        .add("Id", LongType)
        .add("category", StringType)
        )
  }

我想将上述模式用于上面定义的数据框并写入 kafka 主题。但我不确定如何将定义的模式与数据框集成。以下是写入 kafka 主题的代码。

val jsonFormatData = finalDataFrame.select(col("key").cast("string").alias("key"),
      to_json(struct(
        col("sentence"),
        col("category"),
        col("key").as("Id")
      )).alias("value"))
    jsonFormatData.printSchema()
    val writeStream = jsonFormatData
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "myTopic.val")
      .option("checkpointLocation", "test_path")
      .start()
    writeStream.awaitTermination()
 

标签: scalaapache-sparkapache-kafkaschema

解决方案


推荐阅读