首页 > 解决方案 > Spark SQL:将JSON格式的毫秒时间戳转换为日期格式

问题描述

Schema 的 dataType 声明为,Timestamp但 spark 作业未将其转换为正确的格式。

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", topic)
//          .option("maxOffsetsPerTrigger", 10000)
          .load();

      Dataset<Row> rawStream = stream
              .selectExpr("CAST(value AS STRING)")
              .select(from_json(col("value"), eventSpecificStructType).as("eventData"))
              .select("eventData.*")
              .filter(col("eventType").equalTo("Test"));

传入的时间戳1542126896113转换为50838-01-28 18:49:111.0.
有没有办法将毫秒转换为日期时间格式?

标签: javaapache-sparkapache-spark-sql

解决方案


您必须在 Java 中创建一个 UDF。

import java.sql.Timestamp;
import java.text.SimpleDateFormat;

SimpleDateFormat dateFormat = new SimpleDateFormat("....Date time pattern...");
spark.udf().register("timestamp", new UDF1<String, Timestamp>() {
    private static final long serialVersionUID = 1335972766810808134L;
    @Override
    public Timestamp call(String source)
    {
      try{
            return new Timestamp(dateFormat.parse(source).getTime());
         } catch (ParseException e) {
                 e.printStackTrace();
         }
      }
      return null;
     }
 }, DataTypes.TimestampType);

最后:

stream = stream.withColumn("col", callUDF("timestamp", dataframe.col("col")));

推荐阅读