首页 > 解决方案 > 将浮点列添加到 TimestampType 列(秒+毫秒)

问题描述

我正在尝试将浮点列添加到 pyspark 中的 TimestampType 列,但似乎没有办法在保持小数秒的同时做到这一点。float_seconds 的示例是 19.702300786972046,时间戳的示例是 2021-06-17 04:31:32.48761

我想要的是:

calculated_df = beginning_df.withColumn("calculated_column", float_seconds_col + TimestampType_col)

我尝试了以下方法,但都没有完全解决问题:

#method 1 添加单个时间,但不能用于将整列添加到时间戳列。

calculated_df = beginning_df.withColumn("calculated_column",col("TimestampType_col") + F.expr('INTERVAL 19.702300786 seconds'))

#method 2 将 float 列转换为 unixtime,但去掉小数(这很重要)

timestamp_seconds = beginning_df.select(from_unixtime("float_seconds"))

有问题的两列的图像

标签: pysparkhiveapache-spark-sqltimestamphiveql

解决方案


您可以使用 UDF 来实现它,如下所示:

from datetime import datetime, timedelta

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, FloatType, TimestampType

spark = SparkSession \
    .builder \
    .appName("StructuredStreamTesting") \
    .getOrCreate()
schema = (StructType([
    StructField('dt', TimestampType(), nullable=True),
    StructField('sec', FloatType(), nullable=True),
]))

item1 = {
    "dt": datetime.fromtimestamp(1611859271.516),
    "sec": 19.702300786,
}
item2 = {
    "dt": datetime.fromtimestamp(1611859271.517),
    "sec": 19.702300787,
}

item3 = {
    "dt": datetime.fromtimestamp(1611859271.518),
    "sec": 19.702300788,
}

df = spark.createDataFrame([item1, item2, item3], schema=schema)

df.printSchema()


@udf(returnType=TimestampType())
def add_time(dt, sec):
    return dt + timedelta(seconds=sec)


df = df.withColumn("new_dt", add_time(col("dt"), col("sec")))
df.printSchema()
df.show(truncate=False)

推荐阅读