python - 创建 SparseVectors 的 Pandas UDF
问题描述
我正在尝试定义一个允许SparseVectors
从字典列创建的 pandas udf。下面是一个例子
from pyspark.sql import Row
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql.functions import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
# create example data
dff = spark.createDataFrame([Row(features=Row(indices=[1,2], size=10, values = [11,12])),
Row(features=Row(indices=[3,4], size=10, values = [13,14])),
Row(features=Row(indices=[5,6,7], size=10, values = [15,16,17]))
])
print(dff.printSchema())
# access values in the struct
dff.withColumn('sparse', col('features')['size'])
我可以访问 features 列中的单个键值对,因此我使用 rdd.map 来创建SparseVectors
.
# create sparse vectors using rdd.map works fine
dff.rdd.map(lambda x: SparseVector(x.features['size'],
x.features['indices'],
x.features['values'])).collect()
我想在不使用 rdd 的情况下做同样的事情。我尝试使用.withColumn
.
# trying using withColumn and SparseVector
dff.withColumn('sparse', SparseVector(col('features')['size'],
col('features')['indices'],
col('features')['values']))
但得到以下错误。
TypeError: int() argument must be a string, a bytes-like object or a number, not 'Column'
我尝试udf
在下面定义 s 。
# create sparse vector using column of dictionaries and udfs
#@udf
#def create_s_vector(x):
# return SparseVector(x['size'],x['indices'],x['values'])
# not sure whats the proper returnType
@pandas_udf(VectorUDT(), PandasUDFType.SCALAR)
def create_s_vector(x_iter):
for x in x_iter:
yield SparseVector(x['size'],x['indices'],x['values'])
# try using udf
dff.withColumn('sparse', create_s_vector(col('features')))
使用上面的代码,我得到不支持 returnType 类型的错误。谢谢!
解决方案
推荐阅读
- sql - 为 sql 库操作制作通用选择函数
- python - 进行输入并运行循环以创建自定义数量的对象
- jquery - 将表单提交的值附加为 url 参数
- java - 如何修复 fastlane 错误:找不到用于签名配置“externalOverride”的密钥库文件“keystore.jks”。?
- redux - 组件会在 react-redux 中使用 useStore 重新渲染吗?
- javascript - 为什么 Bootstrap Spinner 在计算密集型上下文中无法使用
- swift - 等到方法完成
- windows - 使用 FORFILES 指定输出目录
- java - 为什么我收不到arraylist的内容
- css - 当我使用 zoomIn 动画时,元素正在消失