首页 > 解决方案 > Spark + scala 用于 StringIndexer 多列的新管道

问题描述

我尝试StringIndexer()在多个列上应用,我使用ScalaSpark 2.3。
这是我的代码:

val df1 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("file:///c:/tmp/spark-warehouse/train.csv")

val feat = df1.columns.filterNot(_ .contains("BsmtFinSF1"))

val inds = feat.map { colName =>
  val indexer1 = new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(colName + "I")
    .fit(df1)

  Array(indexer1)
}

val pipeline = new Pipeline().setStages(inds.toArray)

但是,我有这个错误:

错误:(134, 50) 类型不匹配;

找到:Array[Array[org.apache.spark.ml.feature.StringIndexerModel]]
需要:Array[? <:org.apache.spark.ml.PipelineStage]

注意:数组[org.apache.spark.ml.feature.StringIndexerModel] >: ? <: org.apache.spark.ml.PipelineStage,但类 Array 在类型 T 中是不变的。您可能希望研究通配符类型,例如_ >: ? <: org.apache.spark.ml.PipelineStage. (SLS 3.2.10)
val pipeline = new Pipeline().setStages(inds.toArray)

任何帮助将不胜感激。谢谢你

标签: scalaapache-spark

解决方案


.setStages需要一个Array[PipelineStage],但实际上它变成了Array[Array[PipelineStage]因为你indexer1在这里换成了多余的 Array: Array(indexer1)。Map 函数返回一个相同类型的集合。该集合的元素是由传递给 Map 的函数的应用程序产生的。所以试试这样:

val inds = feat.map { colName =>
   new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(colName + "I")
    .fit(df1)          
}

推荐阅读