首页 > 解决方案 > 从 Spark 中的文档中删除最少使用的单词

问题描述

我有一个如下所示的数据框

  private val sample = Seq(
    (1, "A B C D E"),
    (1, "B C D"),
    (1, "B C D E"),
    (1, "B C D F"),
    (1, "A B C"),
    (1, "B C E F G")
  )

我想从数据框中删除最少使用的单词。为此,我使用 tf-idf 来计算最少使用的单词。

// Create the Tokenizer step
val tokenizer = new Tokenizer()
  .setInputCol("regexTransformedColumn")
  .setOutputCol("words")

// Create TF
val hashingTF = new HashingTF()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("rawFeatures")

// Create TF IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")

// Create the pipeline
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, idf))

val lrModel = pipeline.fit(regexTransformedLabel)

val lrOutput = lrModel.transform(regexTransformedLabel)

我得到如下输出

+---------+---------------+---------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+
|clusterId|words          |rawFeatures                                                    |features                                                                                                    |
+---------+---------------+---------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+
|1        |[a, b, c, d, e]|(262144,[17222,27526,28698,30913,227410],[1.0,1.0,1.0,1.0,1.0])|(262144,[17222,27526,28698,30913,227410],[0.5596157879354227,0.3364722366212129,0.0,0.0,0.8472978603872037])|
|1        |[b, c, d]      |(262144,[27526,28698,30913],[1.0,1.0,1.0])                     |(262144,[27526,28698,30913],[0.3364722366212129,0.0,0.0])                                                   |
|1        |[b, c, d, e]   |(262144,[17222,27526,28698,30913],[1.0,1.0,1.0,1.0])           |(262144,[17222,27526,28698,30913],[0.5596157879354227,0.3364722366212129,0.0,0.0])                          |
|1        |[b, c, d, f]   |(262144,[24152,27526,28698,30913],[1.0,1.0,1.0,1.0])           |(262144,[24152,27526,28698,30913],[0.8472978603872037,0.3364722366212129,0.0,0.0])                          |
|1        |[a, b, c]      |(262144,[28698,30913,227410],[1.0,1.0,1.0])                    |(262144,[28698,30913,227410],[0.0,0.0,0.8472978603872037])                                                  |
|1        |[b, c, e, f, g]|(262144,[17222,24152,28698,30913,51505],[1.0,1.0,1.0,1.0,1.0]) |(262144,[17222,24152,28698,30913,51505],[0.5596157879354227,0.8472978603872037,0.0,0.0,1.252762968495368])  |
+---------+---------------+---------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+

但是我怎样才能从转换后的特征中得到单词,这样我就可以删除最少使用的单词。

我将通过 max features 来删除 tf-idf feature 超过 max features 的单词。如果我将最大特征设为 0.6,则应从数据框中删除 A(0.8) 和 G(1.2)。但是我无法将这些特征转换为单词,以便我可以删除最少使用的单词。

标签: scalaapache-sparkapache-spark-sqlapache-spark-mllib

解决方案


使用您的示例,我将使用CountVectorizer& CountVecorizerModel。由于HashingTF是一种散列方法,因此提取原始标签不可逆。

这确实意味着您必须有fit2 个模型,一个用于 the CountVectorizer,一个用于IDF

在示例中vocabulary是本地的,因此如果您想在集群上运行它,最好将其设为广播变量。

// Create the Tokenizer step
val tokenizer = new Tokenizer()
  .setInputCol("regexTransformedColumn")
  .setOutputCol("words")

// Create CountVecoritzer for label vocab
val countVectorizer = new CountVectorizer()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("rawFeatures")
  .setMinDF(1)

// Combine into count vectorizer pipeline
val cvPipeline = new Pipeline()
  .setStages(Array(tokenizer, countVectorizer))

// Create pipeline for token & count vectorizer (TF)
val pipelineModel = cvPipeline.fit(regexTransformedLabel)

// Extract vocabulary
val vocabulary = pipelineModel.stages.last.asInstanceOf[CountVectorizerModel].vocabulary

// Transform the dataset to TF dataset
val termFrequencyData = pipelineModel.transform(regexTransformedLabel)

// Create IDF
val idf = new IDF().setInputCol(countVectorizer.getOutputCol).setOutputCol("features")

// Fit the IDF on the TF data
val lrModel = idf.fit(termFrequencyData)

// Tranform the TF Data into TF/IDF data
val lrOutput = lrModel.transform(termFrequencyData)


def removeLeastUsed(treshold: Double) = udf((features: SparseVector) => {
  (features.indices zip features.values) filter(_._2 < treshold) map {
    case (index, _) => vocabulary(index)
  }
})


lrOutput
 .select(
    'regexTransformedColumn, 
    'features,
    removeLeastUsed(0.6)('features).as("mostUsedWords")
  )  
  .show(false)

输出:

+----------------------+----------------------------------------------------------------------------------+-------------+
|regexTransformedColumn|features                                                                          |mostUsedWords|
+----------------------+----------------------------------------------------------------------------------+-------------+
|A B C D E             |(7,[0,1,2,3,4],[0.0,0.0,0.3364722366212129,0.5596157879354227,0.8472978603872037])|[c, b, d, e] |
|B C D                 |(7,[0,1,2],[0.0,0.0,0.3364722366212129])                                          |[c, b, d]    |
|B C D E               |(7,[0,1,2,3],[0.0,0.0,0.3364722366212129,0.5596157879354227])                     |[c, b, d, e] |
|B C D F               |(7,[0,1,2,5],[0.0,0.0,0.3364722366212129,0.8472978603872037])                     |[c, b, d]    |
|A B C                 |(7,[0,1,4],[0.0,0.0,0.8472978603872037])                                          |[c, b]       |
|B C E F G             |(7,[0,1,3,5,6],[0.0,0.0,0.5596157879354227,0.8472978603872037,1.252762968495368]) |[c, b, e]    |
+----------------------+----------------------------------------------------------------------------------+-------------+

推荐阅读