pyspark - 将浮点列添加到 TimestampType 列(秒+毫秒)
问题描述
我正在尝试将浮点列添加到 pyspark 中的 TimestampType 列,但似乎没有办法在保持小数秒的同时做到这一点。float_seconds 的示例是 19.702300786972046,时间戳的示例是 2021-06-17 04:31:32.48761
我想要的是:
calculated_df = beginning_df.withColumn("calculated_column", float_seconds_col + TimestampType_col)
我尝试了以下方法,但都没有完全解决问题:
#method 1 添加单个时间,但不能用于将整列添加到时间戳列。
calculated_df = beginning_df.withColumn("calculated_column",col("TimestampType_col") + F.expr('INTERVAL 19.702300786 seconds'))
#method 2 将 float 列转换为 unixtime,但去掉小数(这很重要)
timestamp_seconds = beginning_df.select(from_unixtime("float_seconds"))
解决方案
您可以使用 UDF 来实现它,如下所示:
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, FloatType, TimestampType
spark = SparkSession \
.builder \
.appName("StructuredStreamTesting") \
.getOrCreate()
schema = (StructType([
StructField('dt', TimestampType(), nullable=True),
StructField('sec', FloatType(), nullable=True),
]))
item1 = {
"dt": datetime.fromtimestamp(1611859271.516),
"sec": 19.702300786,
}
item2 = {
"dt": datetime.fromtimestamp(1611859271.517),
"sec": 19.702300787,
}
item3 = {
"dt": datetime.fromtimestamp(1611859271.518),
"sec": 19.702300788,
}
df = spark.createDataFrame([item1, item2, item3], schema=schema)
df.printSchema()
@udf(returnType=TimestampType())
def add_time(dt, sec):
return dt + timedelta(seconds=sec)
df = df.withColumn("new_dt", add_time(col("dt"), col("sec")))
df.printSchema()
df.show(truncate=False)
推荐阅读
- android - Android 9 Intent.ACTION_UNINSTALL_PACKAGE 不起作用
- sql - 如何判断 ALTER PROCEDURE 是否有效?
- sql-server - 重复 ID 不是行 ID
- amazon-web-services - 每次添加消息时 AWS Dead Letter Queue Cloudwatch 警报
- javascript - VueJs - 将字符串放在 img src 属性中
- wordpress - 损坏的分类链接 WordPress
- perl - 在 perl 中声明匿名数组集
- javascript - 一步减少和排序对象数组
- amazon-web-services - Kinesis 和 SQS 有什么区别?
- ruby - Ruby条件不在reduce方法内部触发