首页 > 解决方案 > 在 spark 中解析不同的时间戳格式

问题描述

我有一个 csv 文件,其中一些列是时间戳,格式为“dd/MM/yyyy HH:mm:ss”,而同一 .csv 文件中的其他列的时间戳格式为“dd-MM-yyyy HH:mm:ss” '。在 spark 读取 csv 文件时,我尝试了这样的操作:

  SparkSession spark = SparkSession
                .Builder()
                .AppName("Spark Project")
                .GetOrCreate();


                spark.Read()
               .Option("delimiter", fileconfig.FileLoaderColumnSeparator)
               .Option("header", hasHeader)
               .Option("inferSchema", true)
               .Option("TimeStampFormat", "dd/MM/yyyy HH:mm:ss")
               .Option("TimeStampFormat", "dd-MM-yyyy HH:mm:ss")
               .Option("TreatEmptyValuesAsNulls", true)
               .Option("IgnoreLeadingWhiteSpace", true)
               .Option("IgnoreTrailingWhiteSpace", true)
               .Csv(path);

但是以这种方式,它仅将最后一个 TimestampFormat 假设为时间戳,将第一个 TimestampFormat 假设为字符串。我也尝试过 .Option("TimeStampFormat", "dd-MM-yyyy HH:mm:ss", "dd/MM/yyyy HH:mm:ss") 和 .Option("TimeStampFormat", "dd-MM -yyyy HH:mm:ss, dd/MM/yyyy HH:mm:ss") 但这些选项都不起作用。如何解析这两种类型的时间戳格式。

如果我省略选项 timestampformat 所有时间戳都保存为字符串

标签: .netapache-spark

解决方案


读取 csv 文件时不能指定两种时间戳格式,默认last timestamp format使用其他格式overwritten

These are the possible options I could think of:

1.use withColumn while reading the csv file:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

spark.read().
    option("delimiter", fileconfig.FileLoaderColumnSeparator).
    option("header", hasHeader).
    option("inferSchema", true).
    option("TimeStampFormat", "dd-MM-yyyy HH:mm:ss").
    option("TreatEmptyValuesAsNulls", true).
    option("IgnoreLeadingWhiteSpace", true).
    option("IgnoreTrailingWhiteSpace", true).
    csv(path).
    withColumn("<MM/dd/yyyy_field_name>",to_timestamp(col("<MM/dd/yyyy_field_name>","MM/dd/yyyy HH:mm:ss"))

read the csv file with one timestamp format:

val df= spark.read().
    option("delimiter", fileconfig.FileLoaderColumnSeparator).
    option("header", hasHeader).
    option("inferSchema", true).
    option("TimeStampFormat", "dd-MM-yyyy HH:mm:ss").
    option("TreatEmptyValuesAsNulls", true).
    option("IgnoreLeadingWhiteSpace", true).
    option("IgnoreTrailingWhiteSpace", true).
    csv(path)

2.Then using to_timestamp function change type to string to timestamp:

从火花> = 2.2

df.withColumn("<MM/dd/yyyy_field_name>",to_timestamp(col("<MM/dd/yyyy_field_name>","MM/dd/yyyy HH:mm:ss"))

3.Using from_unixtime and unix_timestamp functions:

df.withColumn("<MM/dd/yyyy_field_name>",from_unixtime(unix_timestamp(col("<MM/dd/yyyy_field_name>"),"dd/MM/yyyy HH:mm:ss"),"yyyy-MM-dd HH:mm:ss").cast("timestamp"))

4.Using unix_timestamp and casting to timestamp type:

df.withColumn("<MM/dd/yyyy_field_name>",unix_timestamp(col("<MM/dd/yyyy_field_name>"),"dd/MM/yyyy HH:mm:ss").cast("timestamp"))

More dynamic way:

#input dataframe
df.show()
+-------------------+-------------------+-------------------+
|                 dt|                st1|                dt1|
+-------------------+-------------------+-------------------+
|12/12/2019 09:23:23|12/12/2019 09:23:23|12/12/2019 09:23:23|
|12/13/2018 12:23:23|12/13/2018 12:23:23|12/13/2018 12:23:23|
+-------------------+-------------------+-------------------+
#filter all matching rows starts with dt and cast to timestamp

val df2= df.columns.filter(_.startsWith("dt")).foldLeft(df)((df, c) => {
  df.withColumn(s"$c",unix_timestamp(col(s"$c"),"dd/MM/yyyy HH:mm:ss").cast("timestamp"))
})

df2.show(false)

#dynamically changed the format and casted to timestamp for dt,dt1 columns.
//+---------------------+-------------------+---------------------+
//|dt                   |st1                |dt1                  |
//+---------------------+-------------------+---------------------+
//|2019-12-12 09:23:23.0|12/12/2019 09:23:23|2019-12-12 09:23:23.0|
//|2019-01-12 12:23:23.0|12/13/2018 12:23:23|2019-01-12 12:23:23.0|
//+---------------------+-------------------+---------------------+

推荐阅读