首页 > 解决方案 > 通过在 Spark 数据框中动态将列转换为给定类型添加新列

问题描述

我有两个具有不同架构的相同列的数据框

val schema = StructType(
     |   StructField("firstName", StringType, true) ::
     |     StructField("lastName", IntegerType, false) ::
     |       StructField("lod", DateType, false) ::
     |     StructField("dob", DateType, false) :: Nil)
    
val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

val df2 = Seq(("eturi","1","15/10/2020","Jun/01/2012"),
("vijaynull","12","25/08/2001","Aug/14/2014"),
("lakshmi","xyz",null,"Jul/15/2015")
).toDF("firstName","lastName","lod","dob")

 df2.printSchema
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- lod: string (nullable = true)
 |-- dob: string (nullable = true)

现在我需要通过为 df2 的每一列添加一个新列来基于 df 列数据类型转换 df2 列,并且我还需要根据另一个列表中给出的模式转换所有日期和时间戳列。

val date_formats = List("dd/MM/yyyy","MMM/dd/yyyy")
val df_data_val = df.columns.foldLeft(df2) { case (tmpdf, c) =>
  df.schema(c).dataType match {
    case DateType =>
      if (date_formats.contains("dd/MM/yyyy",)) {
        tmpdf.withColumn(
          c + "_cast",
          to_date(unix_timestamp(df2(c), "dd/MM/yyyy").cast(TimestampType))
        )       
      } else if (date_formats.contains("MMM/dd/yyyy")) {
        tmpdf.withColumn(
          c + "_cast",
          to_date(unix_timestamp(df2(c), "MMM/dd/yyyy").cast(TimestampType))
        )         
      }
    else{
      tmpdf.withColumn(c + "_cast", df2(c).cast(df.schema(c).dataType))
    
    case _ => tmpdf.withColumn(c + "_cast", df2(c).cast(df.schema(c).dataType))
  }
}

低于输出 在此处输入图像描述 预期输出

在此处输入图像描述

标签: scaladataframeapache-spark

解决方案


coalesce您可以使用而不是处理不同的日期格式if/else

val date_formats = List("dd/MM/yyyy","MMM/dd/yyyy")

val df_data_val = df.columns.foldLeft(df2) { case (tmpdf, c) =>
  df.schema(c).dataType match {
    case DateType => tmpdf.withColumn(
      c + "_cast", 
      coalesce(date_formats.map(f => to_date(unix_timestamp(df2(c), f).cast(TimestampType))):_*)
    )
    case _ => tmpdf.withColumn(c + "_cast", df2(c).cast(df.schema(c).dataType))
  }
}

df_data_val.show
+---------+--------+----------+-----------+--------------+-------------+----------+----------+
|firstName|lastName|       lod|        dob|firstName_cast|lastName_cast|  lod_cast|  dob_cast|
+---------+--------+----------+-----------+--------------+-------------+----------+----------+
|    eturi|       1|15/10/2020|Jun/01/2012|         eturi|            1|2020-10-15|2012-06-01|
|vijaynull|      12|25/08/2001|Aug/14/2014|     vijaynull|           12|2001-08-25|2014-08-14|
|  lakshmi|     xyz|      null|Jul/15/2015|       lakshmi|         null|      null|2015-07-15|
+---------+--------+----------+-----------+--------------+-------------+----------+----------+

推荐阅读