pyspark - Lambda 表达式 + pySpark
问题描述
我正在尝试将 spark DataFrame 中的列与给定日期进行比较,如果列日期小于给定日期,则添加 n 小时,否则添加 x 小时。
就像是
addhours = lambda x,y: X + 14hrs if (x < y) else X + 10hrs
其中 y 将保存指定的静态日期,然后应用于 DataFrame 列
就像是
df = df.withColumn("newDate", checkDate(df.Time, F.lit('2015-01-01') ))
这是 df 的示例
from pyspark.sql import functions as F
import datetime
df = spark.createDataFrame([('America/NewYork', '2020-02-01 10:00:00'),('Africa/Nairobi', '2020-02-01 10:00:00')],["OriginTz", "Time"])
对激发数据帧有点新意 :)
解决方案
使用when+othewise
语句而不是udf
.
Example:
from pyspark.sql import functions as F
#we are casting to timestamp and date so that we can compare in when
df = spark.createDataFrame([('America/NewYork', '2020-02-01 10:00:00'),('Africa/Nairobi', '2003-02-01 10:00:00')],["OriginTz", "Time"]).\
withColumn("literal",F.lit('2015-01-01').cast("date")).\
withColumn("Time",F.col("Time").cast("timestamp"))
df.show()
#+---------------+-------------------+----------+
#| OriginTz| Time| literal|
#+---------------+-------------------+----------+
#|America/NewYork|2020-02-01 10:00:00|2015-01-01|
#| Africa/Nairobi|2003-02-01 10:00:00|2015-01-01|
#+---------------+-------------------+----------+
#using unix_timestamp function converting to epoch time then adding 10*3600 -> 10 hrs finally converting to timestamp format
df.withColumn("new_date",F.when(F.col("Time") > F.col("literal"),F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss') + 10 * 3600)).\
otherwise(F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss') + 14 * 3600))).\
show()
#+---------------+-------------------+----------+-------------------+
#| OriginTz| Time| literal| new_date|
#+---------------+-------------------+----------+-------------------+
#|America/NewYork|2020-02-01 10:00:00|2015-01-01|2020-02-01 20:00:00|
#| Africa/Nairobi|2003-02-01 10:00:00|2015-01-01|2003-02-02 00:00:00|
#+---------------+-------------------+----------+-------------------+
如果您不想将文字值添加为数据框列。
lit_val='2015-01-01'
df = spark.createDataFrame([('America/NewYork', '2020-02-01 10:00:00'),('Africa/Nairobi', '2003-02-01 10:00:00')],["OriginTz", "Time"]).\
withColumn("Time",F.col("Time").cast("timestamp"))
df.withColumn("new_date",F.when(F.col("Time") > F.lit(lit_val).cast("date"),F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss') + 10 * 3600)).\
otherwise(F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss') + 14 * 3600))).\
show()
#+---------------+-------------------+----------+-------------------+
#| OriginTz| Time| literal| new_date|
#+---------------+-------------------+----------+-------------------+
#|America/NewYork|2020-02-01 10:00:00|2015-01-01|2020-02-01 20:00:00|
#| Africa/Nairobi|2003-02-01 10:00:00|2015-01-01|2003-02-02 00:00:00|
#+---------------+-------------------+----------+-------------------+
推荐阅读
- c# - 是否可以将 C# 9 用于 Xamarin?
- python - 为什么这个切片不返回前 3 个元素?
- flutter - Flutter - 如何将元素移动到列表的末尾?
- string - Powershell:从txt中提取几个字符串并从中创建表
- wordpress - 如果购物车中存在两种不同的运输类别,则仅显示 1 个特定的运输类别
- php - Codeigniter 3 - do_upload 中无法识别文件
- angular - owl-date-time 和 http 角度同步
- objective-c - 未定义符号:_OBJC_CLASS_$_FIRApp
- prolog - 扩展统一,SICStus 风格
- rust - 如何从 actix SyncContext 向另一个参与者发送消息?