首页 > 解决方案 > Spark从镶木地板文件中读取的列名中删除特殊字符

问题描述

我有使用以下 spark 命令读取的镶木地板文件

lazy val out = spark.read.parquet("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")

很多列的列名都有特殊字符“(”。比如WA_0_DWHRPD_Purge_Date_(TOD)WA_0_DWHRRT_Record_Type_(80=Index)我怎样才能删除这个特殊字符。

我的最终目标是删除这些特殊字符并使用以下命令写回镶木地板文件

df_hive.write.format("parquet").save("hdfs:///tmp/oip/logprint_poc_cleaned/")

另外,我正在使用 Scala spark shell。我是新手,我看到了类似的问题,但在我的情况下没有任何效果。任何帮助表示赞赏。

标签: apache-sparkparquet

解决方案


您可以做的第一件事是在执行操作时将 parquet 文件读入数据框中。

val out = spark.read.parquet("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")

创建数据框后,尝试获取数据框的架构并对其进行解析以删除所有特殊字符,如下所示:

import org.apache.spark.sql.functions._
val schema = StructType(out.schema.map(
          x => StructField(x.name.toLowerCase().replace(" ", "_").replace("#", "").replace("-", "_").replace(")", "").replace("(", "").trim(),
            x.dataType, x.nullable)))

现在,您可以通过指定您创建的模式从 parquet 文件中读取数据。

val newDF = spark.read.format("parquet").schema(schema).load("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")

现在,您可以继续使用已清理的列名称保存数据框。

df_hive.write.format("parquet").save("hdfs:///tmp/oip/logprint_poc_cleaned/")

推荐阅读