python - 添加/减去两个 pyspark CountVectorizer 稀疏向量列
问题描述
我想采用 CountVectorizer 转换的文档对的差异。换句话说,取两列稀疏向量之间的差异。我将相同的转换器应用于 df[doc1] 和 df[doc2],因此结果向量对 (df['X1'] - df['X2']) 的维度将始终保持一致。
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
df = spark.createDataFrame([("homer likes donuts".split(" "), "donuts taste delicious".split(" "), 0),
("five by five boss".split(" "), "five is a number".split(" "), 1)],
["words1", "words2", "label"])
display(df)
cv = CountVectorizer()
union_words = df.select(col('words1').alias('words')).union(df.select(col('words2').alias('words')))
cv = CountVectorizer() \
.setInputCol('words') \
.fit(union_words)
df = cv.setInputCol('words1') \
.setOutputCol('X1') \
.transform(df)
df = cv.setInputCol('words2') \
.setOutputCol('X2') \
.transform(df)
display( df )
X1 X2
[0,11,[1,2,9],[1,1,1]] [0,11,[1,4,8],[1,1,1]]
[0,11,[0,3,10],[2,1,1]] [0,11,[0,5,6,7],[1,1,1,1]]
我无法添加列(列类型不匹配,需要数字或日历间隔)。我尝试了@zero323 的添加函数,但在 isinstance(v1, SparseVector) 处遇到断言错误
df.withColumn("result", (col("X1") + col("X2"))
df.withColumn("result", add(col("X1"), col("X2"))
在稀疏矢量格式中,我希望结果是:
[0,11,[2,4,8,9],[1,-1,-1,1]]
[0,11,[0,3,5,6,7,10],[1,1,-1,-1,-1,1]]
解决方案
需要将函数转换为返回类型为 VectorUDT 的 udf。通过在这里和这里结合解决方案来解决。
import numpy as np
from pyspark.sql.functions import udf
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
df = spark.createDataFrame(data=[("homer likes donuts".split(" "), ["donuts", "taste", "delicious"], 0),
(["five", "by", "five", "boss"], ["five", "is", "a", "number"], 1)],
schema=["words1", "words2", "label"])
cv = CountVectorizer()
union_words = df.select(col('words1').alias('words')).union(df.select(col('words2').alias('words')))
cv = CountVectorizer() \
.setInputCol('words') \
.fit(union_words)
df = cv.setInputCol('words1') \
.setOutputCol('X1') \
.transform(df)
df = cv.setInputCol('words2') \
.setOutputCol('X2') \
.transform(df)
@udf(VectorUDT())
def minus(v1, v2):
# Sparse vector will become dense
assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
assert v1.size == v2.size
# Compute union of indices
indices = set(v1.indices).union(set(v2.indices))
# Not particularly efficient but we are limited by SPARK-10973
# Create index: value dicts
v1d = dict(zip(v1.indices, v1.values))
v2d = dict(zip(v2.indices, v2.values))
zero = np.float64(0)
# Create dictionary index: (v1[index] - v2[index])
values = {i: v1d.get(i, zero) - v2d.get(i, zero)
for i in indices
if v1d.get(i, zero) - v2d.get(i, zero) != zero}
return Vectors.sparse(v1.size, values)
df = df.withColumn('NAME_X_DIFF', minus('X1', 'X2'))
display(df)
+----------------------+--------------------------+-----+---------------------------+--------------------------------+-----------------------------------------------+
|words1 |words2 |label|X1 |X2 |X_DIFF |
+----------------------+--------------------------+-----+---------------------------+--------------------------------+-----------------------------------------------+
|[homer, likes, donuts]|[donuts, taste, delicious]|0 |(11,[1,4,10],[1.0,1.0,1.0])|(11,[1,3,5],[1.0,1.0,1.0]) |(11,[3,4,5,10],[-1.0,1.0,-1.0,1.0]) |
|[five, by, five, boss]|[five, is, a, number] |1 |(11,[0,6,9],[2.0,1.0,1.0]) |(11,[0,2,7,8],[1.0,1.0,1.0,1.0])|(11,[0,2,6,7,8,9],[1.0,-1.0,1.0,-1.0,-1.0,1.0])|
+----------------------+--------------------------+-----+---------------------------+--------------------------------+-----------------------------------------------+
推荐阅读
- node.js - 为什么将异步函数传递给 Node.js Express.js 路由器?
- html - 如何修复引导轮播?
- php - 如何防止codepipeline删除webroot中的文件?
- azure-devops - Azure devops:检查其他版本的状态版本
- python - 为什么 Python 的 defaultdict 对 pickle 的行为很奇怪?
- reactjs - 反应钩子,在选择时设置状态
- kentico - CMS_DocumentAlias 表为空 Kentico
- excel - 将工作目录从 VBA 更改为 UNC
- sharepoint - 工作流程 - 如果那时(电子邮件通知不起作用)
- javascript - 如何设置跨域 cookie nextjs 和 apollo