apache-spark - 将字典传递给 pyspark udf
问题描述
我是 pyspark 的新手,我正在尝试使用 udf 来映射一些字符串名称。我必须将一些数据值映射到新名称,所以我打算将来自 sparkdf 的列值和映射字段的字典发送到 udf,而不是写大量的.when()
's after .withColumn()
。
尝试仅将 2 个字符串传递给 udf,它可以工作,但传递字典却没有。
def stringToStr_function(checkCol, dict1) :
for key, value in dict1.iteritems() :
if(checkCol != None and checkCol==key): return value
stringToStr_udf = udf(stringToStr_function, StringType())
df = sparkdf.toDF().withColumn(
"new_col",
stringToStr_udf(
lit("REQUEST"),
{"REQUEST": "Requested", "CONFIRM": "Confirmed", "CANCEL": "Cancelled"}
)
)
但是不存在关于方法 col() 的此错误。有任何想法吗?:
File "<stdin>", line 2, in <module>
File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1957, in wrapper
return udf_obj(*args)
File "/usr/lib/spark/python/pyspark/sql/functions.py", line 1918, in __call__
return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
File "/usr/lib/spark/python/pyspark/sql/column.py", line 60, in _to_seq
cols = [converter(c) for c in cols]
File "/usr/lib/spark/python/pyspark/sql/column.py", line 48, in _to_java_column
jcol = _create_column_from_name(col)
File "/usr/lib/spark/python/pyspark/sql/column.py", line 41, in _create_column_from_name
return sc._jvm.functions.col(name)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 323, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.HashMap]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
谢谢你的帮助。我正在使用 aws 胶水和 Python 2.x,并且正在笔记本中进行测试。
解决方案
如链接的副本所示:
最干净的解决方案是使用闭包传递额外的参数
udf
但是,对于这个特定问题,您不需要 a 。(请参阅Spark 函数与 UDF 性能?)
您可以使用pyspark.sql.functions.when
来实现IF-THEN-ELSE
逻辑:
from pyspark.sql.functions import coalesce, col, lit, when
def stringToStr_function(checkCol, dict1):
return coalesce(
*[when(col(checkCol) == key, lit(value)) for key, value in dict1.iteritems()]
)
df = sparkdf.withColumn(
"new_col",
stringToStr_function(
checkCol = lit("REQUEST"),
dict1 = {"REQUEST": "Requested", "CONFIRM": "Confirmed", "CANCEL": "Cancelled"}
)
)
我们遍历字典中的项目并使用它when
来返回value
if 中的值checkCol
与key
. 将其包装在pyspark.sql.functions.coalesce()
将返回第一个非空值的调用中。
推荐阅读
- postgresql - PostgreSQL 中最有效的数组表示法
- python - 通过检查连续元素对数据框进行切片
- sql - 雪花语法 - 语法中的意外空格字符
- c# - 在 C# 中使用 Gdal 库将 shapefile 转换为 kml
- laravel - Laravel-Excel 2,更快地插入数据
- javascript - 我应该在 Javascript 中使用多个变量还是单个对象?
- instrumentation - 使用 Pintool 检测时重复的函数调用
- php - PHPunit 将读取文件重构为 EOF 测试功能
- cpu-architecture - 虚拟化页表的工作原理
- javascript - javascript循环嵌套数组并基于另一个数组提取属性