首页 > 解决方案 > 创建 SparseVectors 的 Pandas UDF

问题描述

我正在尝试定义一个允许SparseVectors从字典列创建的 pandas udf。下面是一个例子

from pyspark.sql import Row
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql.functions import *

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

# create example data
dff = spark.createDataFrame([Row(features=Row(indices=[1,2], size=10, values = [11,12])),
                             Row(features=Row(indices=[3,4], size=10, values = [13,14])),
                             Row(features=Row(indices=[5,6,7], size=10, values = [15,16,17]))
                            ])

print(dff.printSchema())

# access values in the struct
dff.withColumn('sparse', col('features')['size'])

我可以访问 features 列中的单个键值对,因此我使用 rdd.map 来创建SparseVectors.

# create sparse vectors using rdd.map works fine
dff.rdd.map(lambda x: SparseVector(x.features['size'],
                                   x.features['indices'],
                                   x.features['values'])).collect()

我想在不使用 rdd 的情况下做同样的事情。我尝试使用.withColumn.

# trying using withColumn and SparseVector
dff.withColumn('sparse', SparseVector(col('features')['size'],
                                      col('features')['indices'],
                                      col('features')['values']))

但得到以下错误。

TypeError: int() argument must be a string, a bytes-like object or a number, not 'Column'

我尝试udf在下面定义 s 。

# create sparse vector using column of dictionaries and udfs
#@udf
#def create_s_vector(x):
#    return SparseVector(x['size'],x['indices'],x['values'])

# not sure whats the proper returnType
@pandas_udf(VectorUDT(), PandasUDFType.SCALAR)
def create_s_vector(x_iter):
    for x in x_iter:
        yield SparseVector(x['size'],x['indices'],x['values'])

# try using udf
dff.withColumn('sparse', create_s_vector(col('features')))

使用上面的代码,我得到不支持 returnType 类型的错误。谢谢!

标签: pythonpandasapache-sparkpysparkuser-defined-functions

解决方案


推荐阅读