python - 获取 None.org.apache.spark.sql.execution.python.UserDefinedPythonFunction
问题描述
我已经定义了一些简单的函数,例如:
def median_func(xs):
List_median=sorted(xs)
if len(List_median)%2==0:
result=(List_median[int(len(List_median)/2) - 1] + List_median[int(len(List_median)/2)])/2
else:
result=List_median[int(len(List_median)/2)]
return result
## --------------------- ##
def max_func(xs):
List_max=sorted(xs)
return List_max[-1]
## --------------------- ##
def min_func(xs):
List_min=sorted(xs)
return List_min[0]
并将一些 lambda 定义为:
import pyspark.sql.functions as sf
median_udf = sf.udf(lambda xs: median_func(xs), DoubleType())
max_udf = sf.udf(lambda xs: max_func(xs), IntegerType())
min_udf = sf.udf(lambda xs: min_func(xs), DoubleType())
在 PySpark 中,我将这些用作:
data_frame = data_frame.withColumn("Rolling_median_lat", median_udf(column_latitude))\
.withColumn("Rolling_median_lon", median_udf(column_longitude))\
.withColumn("Rolling_max_deltatime", max_udf(column_deltatime))
当我在上面运行 Python 2.7 和 PySpark 2.2.0 时,一切正常。但是当我使用 Python 3.6 尝试相同的代码时,我看到了以下问题:
Py4JError: An error occurred while calling None.org.apache.spark.sql.execution.python.UserDefinedPythonFunction. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.execution.python.UserDefinedPythonFunction([class java.lang.String, class org.apache.spark.api.python.PythonFunction, class org.apache.spark.sql.types.DoubleType$, class java.lang.Integer, class java.lang.Boolean]) does not exist
at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
at py4j.Gateway.invoke(Gateway.java:235)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
我不确定问题是什么。我尝试了一些打击和试验(不确定它们是否值得在这里提及)但没有任何效果。我究竟做错了什么?
编辑:
我定义的变量:
column_latitude
这给出了:
Column<b'array(latitude, Lag_latitude_1, Lead_latitude_1, Lag_latitude_2, Lead_latitude_2, Lag_latitude_3, Lead_latitude_3, Lag_latitude_4, Lead_latitude_4)'>
所以这些是简单的字符串数组。
编辑:这是我的原件dataframe
:
data_frame.head(2)
这给了我:
[Row(id=1234, movementdatetime=datetime.datetime(2017, 9, 4, 13, 57, 16), latitude=38.477, longitude=13.256, deltaTime_sec=3459, Lag_latitude_1=38.4593, Lead_latitude_1=38.4872, Lag_longitude_1=13.4902, Lead_longitude_1=13.1767, Lag_deltatime_1=25531, Lead_deltatime_1=1212, Lag_latitude_2=38.3432, Lead_latitude_2=39.5649, Lag_longitude_2=15.1879, Lead_longitude_2=2.6392, Lag_deltatime_2=3280, Lead_deltatime_2=20623078, Lag_latitude_3=38.331, Lead_latitude_3=39.5649, Lag_longitude_3=15.3842, Lead_longitude_3=2.6392, Lag_deltatime_3=3588, Lead_deltatime_3=14580, Lag_latitude_4=38.324, Lead_latitude_4=39.5649, Lag_longitude_4=15.6001, Lead_longitude_4=2.6391, Lag_deltatime_4=0, Lead_deltatime_4=7199),
Row(id=2345, movementdatetime=datetime.datetime(2017, 9, 4, 14, 17, 28), latitude=38.4872, longitude=13.1767, deltaTime_sec=1212, Lag_latitude_1=38.477, Lead_latitude_1=39.5649, Lag_longitude_1=13.256, Lead_longitude_1=2.6392, Lag_deltatime_1=3459, Lead_deltatime_1=20623078, Lag_latitude_2=38.4593, Lead_latitude_2=39.5649, Lag_longitude_2=13.4902, Lead_longitude_2=2.6392, Lag_deltatime_2=25531, Lead_deltatime_2=14580, Lag_latitude_3=38.3432, Lead_latitude_3=39.5649, Lag_longitude_3=15.1879, Lead_longitude_3=2.6391, Lag_deltatime_3=3280, Lead_deltatime_3=7199, Lag_latitude_4=38.331, Lead_latitude_4=39.5649, Lag_longitude_4=15.3842, Lead_longitude_4=2.6391, Lag_deltatime_4=3588, Lead_deltatime_4=10803)]
基本上我有多个列(九),其中我需要计算中位数。
解决方案
推荐阅读
- python - 更改使用 matplotlib python 显示的箱线图“框”百分位数
- excel - 循环浏览文件夹中的子文件夹
- python - 简化“如果”条件
- nginx - 如何配置 nginx 以指向特定的域站点以提供内容?
- ruby-on-rails - Ruby on Rails 的 JoinTable 关系问题
- deployment - 为什么我不能将我的 repo 发布到 GitHub Pages?
- javascript - vuex 突变设置状态
- javascript - Problem converting function with promise from JS to TS
- python - Keyerror in Python when retrieving json data
- python - 用python绘制不同颜色的圆圈