首页 > 解决方案 > 将数据集 [数据集 [列]] 展平为数据集 [列]

问题描述

我正在尝试捕获数据库所有表的元数据。我正在使用spark.catalogapi 来提取元数据。我写了以下方法

def getColumns(tables:Dataset[Table]): Dataset[Column] = {
  tables.map(table => spark.catalog.listColumns(table.name))
}

此方法返回Dataset[Dataset[Colums]]. 如何将其转换为Dataset[Column]可以保存为 hive 中的表?请推荐是否有任何其他有效的方法来捕获所有表的元数据。

标签: scalaapache-sparkapache-spark-sql

解决方案


    // 1. only column name

    val c1 = spark.catalog.listColumns("database-name.table-name").select('name)
    // c1: org.apache.spark.sql.DataFrame = [name: string]
    c1.printSchema
    // root
    //  |-- name: string (nullable = true)

    // 2. Full info
       

    val c1 = spark.catalog.listColumns("database-name.table-name")
    // c1: org.apache.spark.sql.Dataset[org.apache.spark.sql.catalog.Column] = [name: 
    string, description: string ... 4 more fields]

    c1.printSchema
    // root
    //  |-- name: string (nullable = true)
    //  |-- description: string (nullable = true)
    //  |-- dataType: string (nullable = true)
    //  |-- nullable: boolean (nullable = false)
    //  |-- isPartition: boolean (nullable = false)
    //  |-- isBucket: boolean (nullable = false)

// 3. All info about columns in tables database
    import org.apache.spark.sql.functions._

val dbName = "set database name"

val listTables = spark.catalog.listTables(dbName)
  .select('name).as[String]
  .collect().toList

val listDF = listTables.map(t => {
  val colsDF = spark.catalog.listColumns(dbName, t)
    .withColumn("namneTable", lit(t))
    .withColumn("dbName", lit(dbName))

  colsDF
})

val resDF = listDF.reduce(_ union _)

resDF.printSchema
//  root
//  |-- name: string (nullable = true)
//  |-- description: string (nullable = true)
//  |-- dataType: string (nullable = true)
//  |-- nullable: boolean (nullable = false)
//  |-- isPartition: boolean (nullable = false)
//  |-- isBucket: boolean (nullable = false)
//  |-- namneTable: string (nullable = false)
//  |-- dbName: string (nullable = false)

resDF.write.format("parquet").saveAsTable(.....)

推荐阅读