python - 如何使用自定义 udf 实现来舍入列
问题描述
我有一个 pyspark 数据框,例如:
+-------------------+
| to_return_day|
+-------------------+
| -2.003125|
| -20.96738425925926|
| -2.332546296296296|
| -2.206770833333333|
|-2.9733564814814817|
| 54.71157407407407|
| 51.70229166666667|
|48.666354166666665|
| 9.665497685185185|
| 49.56260416666667|
| 66.68983796296297|
| 49.80550925925926|
| 66.6899074074074|
我想在“to_return_day”>0 时使用 udf 实现向上舍入,在“to_return_day”<0 时实现向下舍入。
我的代码:
from pyspark.sql.functions import udf
@udf("double")
def floor_ceil(col_day):
if col_day > 0:
return ceil(col_day)
else :
return floor(col_day)
spark.udf.register("floor_ceil", floor_ceil)
patron_lending_time.withColumn("to_return_day_round",ceil(col("to_return_day")))\
.show()
我得到
为什么会发生?我该如何修复它?
解决方案
我可能没有完全理解 Q OP 发布的内容。据我了解,OP想要的输出是这样的-
1)对于正值(我取大于等于0),该数字上方最接近的整数值,例如;对于 2.34,它将是 3。
2)对于负值,低于该数字的最接近的整数值,例如;对于 -2.34,它将是 -3。
# Creating the DataFrame
values = [(-2.003125,),(-20.96738425925926,),(-2.332546296296296,),(-2.206770833333333,),
(-2.9733564814814817,),(54.71157407407407,),(51.70229166666667,),(48.666354166666665,),
(9.665497685185185,),(49.56260416666667,),(66.68983796296297,),(49.80550925925926,),
(66.6899074074074,),]
df = sqlContext.createDataFrame(values,['to_return_day',])
df.show()
+-------------------+
| to_return_day|
+-------------------+
| -2.003125|
| -20.96738425925926|
| -2.332546296296296|
| -2.206770833333333|
|-2.9733564814814817|
| 54.71157407407407|
| 51.70229166666667|
| 48.666354166666665|
| 9.665497685185185|
| 49.56260416666667|
| 66.68983796296297|
| 49.80550925925926|
| 66.6899074074074|
+-------------------+
UDF
当使用简单的if-else
语句就足够了时,不需要创建, 。
# Importing relevant functions
from pyspark.sql.functions import ceil, floor, when
df = df.withColumn('to_return_day',when(col('to_return_day') >=0 , ceil(col('to_return_day'))).otherwise(floor(col('to_return_day'))))
df.show()
+-------------+
|to_return_day|
+-------------+
| -3|
| -21|
| -3|
| -3|
| -3|
| 55|
| 52|
| 49|
| 10|
| 50|
| 67|
| 50|
| 67|
+-------------+
如果您只想使用UDF
,则以下代码将起作用。
# Import relevant functions and packages.
from pyspark.sql.functions import udf, col
import math
# Defining a UDF
def round_udf(c):
if c < 0:
return math.floor(c)
else:
return math.ceil(c)
round_udf = udf(round_udf,IntegerType())
df = df.withColumn('to_return_day',round_udf(col('to_return_day')))
推荐阅读
- apache-kafka - 为什么 Encoder 不在包 kafka.serializer 中?
- javascript - 后端服务器必须做什么,客户端做什么
- javascript - 普通 JS 或 Lodash - 用其他对象的值替换对象值
- javascript - 有没有办法在父容器上编写事件侦听器以捕获其所有输入元素上的焦点/模糊事件?
- javascript - 检查所选文本的 execCommand
- java - 如何在流式传输时重命名放置在远程位置的文件
- javascript - D3.js 的鼠标悬停事件在 Leaflet 中不起作用
- android - Kotlin:如何将 ImageView 的内容保存到内部/外部存储器?
- android - Fragment 阻止父布局中的 TextView 渲染
- entity-framework - 如何为通过中间表连接的两个表正确使用 LINQ groupby?