首页 > 解决方案 > 更新数据框中的 JSON 文件时的序列化问题

问题描述

我读入一个JSON文件并将其存储在Dataframe中。

val df1 = spark.read.option("multiline", "true")
            .json("dbfs:/something.json")

该文件的Schema如下所示:

Connections:array
    element:struct
           Name:string
           Properties:struct
                   database:string
                   driver:string
                   hostname:string
                   password.encrypted:string
                   password.encrypted.keyARN:string
                   port:string
                   username:string
           Type:string

我想构建一个可以在我想添加新连接时重用的函数。

我不确定最好的方法是什么,我是否应该构建一个新模式,用数据填充它并将其附加到原始 Connections 数组,然后简单地写回文件?

这就是我尝试使其工作的方式,但是序列化存在错误。

import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, ArrayType, FloatType}

val zipsSchema3 = StructType(List(
  StructField("Name", StringType, true), 
  StructField("Properties", StructType(List(
      StructField("driver", StringType, true), 
      StructField("hostname", StringType, true), 
      StructField("password.encrypted", StringType, true), 
      StructField("password.encrypted.keyARN", StringType, true), 
      StructField("port", StringType, true), 
      StructField("username", StringType, true)
 ))),
  StructField("Type", StringType, true)
))

val data2 = Seq(
  Row("db2", struct("test","testHost","encpwd","keyTest","testPort","testUser"), "typeTest"))

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data2),
  zipsSchema3
)

或者是否有一些内置函数可以在这种情况下使用?

预先感谢您的所有建议!:)

标签: jsonscalainsert-updatedatabricks

解决方案


我不确定为什么,但是当我像这样运行它时,序列化错误就消失了。

 val zipsSchema3 = StructType(List(
      StructField("Name", StringType, true), 
      StructField("Properties", StructType(List(
          StructField("driver", StringType, true), 
          StructField("hostname", StringType, true), 
          StructField("password.encrypted", StringType, true), 
          StructField("password.encrypted.keyARN", StringType, true), 
          StructField("port", StringType, true), 
          StructField("username", StringType, true)
     ))),
      StructField("Type", StringType, true)
    ))

val data2 = Seq(("db2", Seq("test","testHost","encpwd","keyTest","testPort","testUser"), "typeTest"))

val rdd = spark.sparkContext.parallelize(data2)
  .map{ case (name, props, sType) => Row(name, props, sType ) }

val df = spark.createDataFrame(
  rdd,
  zipsSchema3  
)

推荐阅读