首页 > 技术文章 > 自定义 spark transformer 和 estimator 的范例

wdmx 2018-11-19 00:32 原文

https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types

要了解有关Spark ML所基于的数据集API的未来的更多信息,请查看Holden Karau和Seth Hendrickson的会话Spark Structured Streaming,以便在2017年3月14日至16日在Strata + Hadoop World San Jose 进行机器学习。是Python更多你的事情?2017年2月9日,请参阅Karau在2017年东部Spark Summit的调试 PySpark的演讲。

您还可以在Holden Karau和Rachel Warren的“ 高性能Spark:扩展和优化Apache Spark的最佳实践 ”中了解更多信息。

虽然Spark ML管道具有多种算法,但您可能会发现自己需要其他功能而无需离开管道模型。在Spark MLlib中,这不是什么大问题 - 您可以使用RDD转换手动实现算法并继续从那里开始。对于Spark ML管道,同样的方法可以工作,但是我们失去了一些很好的集成管道属性,包括自动运行元算法的能力,例如交叉验证参数搜索。在本文中,您将学习如何使用标准wordcount示例作为起点扩展Spark ML管道模型(人们永远无法逃避大数据wordcount示例的介绍)。

要将自己的算法添加到Spark管道,您需要实现Estimator或Transformer实现PipelineStage接口。对于不需要培训的算法,您可以实现Transformer接口,对于经过培训的算法,您可以实现Estimator接口org.apache.spark.ml(两者都实现基础PipelineStage)。请注意,培训不仅限于复杂的机器学习模型; 甚至MinMaxScaler也需要培训来确定范围。如果他们需要培训,他们必须建造Estimator而不是Transformer。

STRATA数据会议

2014年3月25日至28日在旧金山举行的Strata数据会议
最优价格将于1月11日结束
注意
PipelineStage直接使用不起作用,因为在管道内使用了适合的反射,假设所有阶段都是a Estimator或a Transformer。
除了显而易见的transform或fit函数之外,所有管道阶段都需要提供transformSchema,copy构造函数或实现一个类,它为您提供这些 - 用于copy制作当前阶段的副本,任何新指定的参数合并在一起,并且可以简单地调用defaultCopy(除非你的类有特殊的构造函数注意事项)。

显示了流水线​​阶段的开始以及复制委托 - transformSchema必须根据任何参数集和输入模式生成管道阶段的预期输出。大多数管道阶段只需添加新字段; 如果需要,很少删除以前的字段,但这有时会导致记录包含的数据多于下游所需的数据,从而对性能产生负面影响。如果您发现这是管道中的问题,您可以创建自己的阶段以删除不必要的字段。

class HardCodedWordCountStage(override val uid: String) extends Transformer {
def this() = this(Identifiable.randomUID("hardcodedwordcount"))

def copy(extra: ParamMap): HardCodedWordCountStage = {
defaultCopy(extra)
}
除了生成输出模式之外,该transformSchema函数还应验证输入模式是否适合该阶段(例如,输入列是预期类型)。

这也是您应该对阶段参数执行验证的地方。

transformSchema具有硬编码输入和输出列的字符串输入和矢量输出的简单说明如下。

override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex("happy_pandas")
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
}
// Add the return field
schema.add(StructField("happy_panda_counts", IntegerType, false))
}
使用该Transformer接口可以非常简单地实现不需要训练的算法。由于这是最简单的流水线阶段,因此您可以从实现一个简单的变换器开始,该变换器计算输入列上的字数。

def transform(df: Dataset[_]): DataFrame = {
val wordcount = udf { in: String => in.split(" ").size }
df.select(col("*"),
wordcount(df.col("happy_pandas")).as("happy_panda_counts"))
}
要充分利用管道接口,您需要使用params接口配置管道阶段。

O'Reilly数据通讯
获取O'Reilly数据通讯
每周接收业内人士的见解 - 以及有关数据主题的独家内容,优惠等。

你的邮件

国家

订阅
请阅读我们的隐私政策。
虽然params接口是公共的,但遗憾的是,Spark中常用的常见默认参数是私有的,因此最终会有一些代码重复。除了允许用户指定值之外,参数还可以包含一些基本验证逻辑(例如,正则化参数必须设置为非负值)。两个最常见的参数是输入列和输出列,您可以相对简单地添加到模型中。

除了字符串参数之外,还可以使用任何其他类型,包括停用词之类的字符串列表,以及停用词之类的字符串。

class ConfigurableWordCount(override val uid: String) extends Transformer {
final val inputCol= new Param[String](this, "inputCol", "The input column")
final val outputCol = new Param[String](this, "outputCol", "The output column")

; def setInputCol(value: String): this.type = set(inputCol, value)

def setOutputCol(value: String): this.type = set(outputCol, value)

def this() = this(Identifiable.randomUID("configurablewordcount"))

def copy(extra: ParamMap): HardCodedWordCountStage = {
defaultCopy(extra)
}

override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}

def transform(df: Dataset[_]): DataFrame = {
val wordcount = udf { in: String => in.split(" ").size }
df.select(col("*"), wordcount(df.col($(inputCol))).as($(outputCol)))
}
}
需要训练的算法可以使用Estimator接口实现- 尽管对于许多算法,org.apache.spark.ml.Predictor或者org.apache.spark.ml.classificationClassifier辅助类更容易实现。Estimator和Transformer接口之间的主要区别在于,不是直接在输入上表达转换,而是首先以train函数的形式进行训练。字符串索引器是您可以实现的最简单的估算器之一,虽然它已经在Spark中可用,但仍然是如何使用估计器接口的一个很好的例证。

trait SimpleIndexerParams extends Params {
final val inputCol= new Param[String](this, "inputCol", "The input column")
final val outputCol = new Param[String](this, "outputCol", "The output column")
}

class SimpleIndexer(override val uid: String) extends Estimator[SimpleIndexerModel] with SimpleIndexerParams {

def setInputCol(value: String) = set(inputCol, value)

def setOutputCol(value: String) = set(outputCol, value)

def this() = this(Identifiable.randomUID("simpleindexer"))

override def copy(extra: ParamMap): SimpleIndexer = {
defaultCopy(extra)
}

override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}

override def fit(dataset: Dataset[]): SimpleIndexerModel = {
import dataset.sparkSession.implicits.

val words = dataset.select(dataset($(inputCol)).as[String]).distinct
.collect()
new SimpleIndexerModel(uid, words)
; }
}

class SimpleIndexerModel(
override val uid: String, words: Array[String]) extends Model[SimpleIndexerModel] with SimpleIndexerParams {

override def copy(extra: ParamMap): SimpleIndexerModel = {
defaultCopy(extra)
}

private val labelToIndex: Map[String, Double] = words.zipWithIndex.
map{case (x, y) => (x, y.toDouble)}.toMap

override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}

override def transform(dataset: Dataset[_]): DataFrame = {
val indexer = udf { label: String => labelToIndex(label) }
dataset.select(col("*"),
indexer(dataset($(inputCol)).cast(StringType)).as($(outputCol)))
}
}
如果要实现迭代算法,您可能希望在未缓存输入数据时自动缓存输入数据,或者允许用户指定持久性级别。

O'REILLY在线学习

学得更快。深入挖掘。再看一点。
加入O'Reilly的在线学习平台。立即免费试用,即时查找答案,或掌握新的有用的东西。

学到更多
所述Predictor接口将两个最常用的参数(输入和输出列)作为标签栏,设有柱,和预测列和自动处理的模式转换为我们。

该Classifier界面做许多事一样,但它也增加了rawPredictionColumn,并提供工具来检测(班数getNumClasses),以及输入转换DataFrame到一个RDD LabeledPoints(使它更容易包装旧的MLlib分类算法)。

如果要实现回归或群集接口,则不需要使用公共基本接口集,因此您需要使用通用Estimator接口。

// Simple Bernouli Naive Bayes classifier - no sanity checks for brevity
// Example only - not for production use.
class SimpleNaiveBayes(val uid: String)
extends Classifier[Vector,SimpleNaiveBayes,SimpleNaiveBayesModel] {

def this() = this(Identifiable.randomUID("simple-naive-bayes"))

override def train(ds: Dataset[]): SimpleNaiveBayesModel = {
import ds.sparkSession.implicits.

ds.cache()
// Note: you can use getNumClasses and extractLabeledPoints to get an RDD instead
// Using the RDD approach is common when integrating with legacy machine learning code
// or iterative algorithms which can create large query plans.
// Here we use Datasets since neither of those apply.

// Compute the number of documents
val numDocs = ds.count
// Get the number of classes.
// Note this estimator assumes they start at 0 and go to numClasses
val numClasses = getNumClasses(ds)
// Get the number of features by peaking at the first row
val numFeatures: Integer = ds.select(col($(featuresCol))).head
  .get(0).asInstanceOf[Vector].size
// Determine the number of records for each class
val groupedByLabel = ds.select(col($(labelCol)).as[Double]).groupByKey(x => x)
val classCounts = groupedByLabel.agg(count("*").as[Long])
  .sort(col("value")).collect().toMap
// Select the labels and features so we can more easily map over them.
// Note: we do this as a DataFrame using the untyped API because the Vector
// UDT is no longer public.
val df = ds.select(col($(labelCol)).cast(DoubleType), col($(featuresCol)))
// Figure out the non-zero frequency of each feature for each label and
// output label index pairs using a case clas to make it easier to work with.
val labelCounts: Dataset[LabeledToken] = df.flatMap {
  case Row(label: Double, features: Vector) =>
    features.toArray.zip(Stream from 1)
      .filter{vIdx => vIdx._2 == 1.0}
      .map{case (v, idx) => LabeledToken(label, idx)}
}
// Use the typed Dataset aggregation API to count the number of non-zero
// features for each label-feature index.
val aggregatedCounts: Array[((Double,Integer),Long)] = labelCounts
  .groupByKey(x => (x.label, x.index))
  .agg(count("*").as[Long]).collect()

val theta = Array.fill(numClasses)(new Array[Double](numFeatures))

// Compute the denominator for the general prioirs
val piLogDenom = math.log(numDocs + numClasses)
// Compute the priors for each class
val pi = classCounts.map{case(_, cc) =>
  math.log(cc.toDouble) - piLogDenom }.toArray

// For each label/feature update the probabilities
aggregatedCounts.foreach{case ((label, featureIndex), count) =>
  // log of number of documents for this label + 2.0 (smoothing)
  val thetaLogDenom = math.log(
    classCounts.get(label).map(_.toDouble).getOrElse(0.0) + 2.0)
  theta(label.toInt)(featureIndex) = math.log(count + 1.0) - thetaLogDenom
}
// Unpersist now that we are done computing everything
ds.unpersist()
// Construct a model
new SimpleNaiveBayesModel(uid, numClasses, numFeatures, Vectors.dense(pi),
  new DenseMatrix(numClasses, theta(0).length, theta.flatten, true))

}

override def copy(extra: ParamMap) = {
defaultCopy(extra)
}
}

// Simplified Naive Bayes Model
case class SimpleNaiveBayesModel(
override val uid: String,
override val numClasses: Int,
override val numFeatures: Int,
val pi: Vector,
val theta: DenseMatrix) extends
ClassificationModel[Vector,SimpleNaiveBayesModel] {

override def copy(extra: ParamMap) = {
defaultCopy(extra)
}

// We have to do some tricks here because we are using Spark's
// Vector/DenseMatrix calculations - but for your own model don't feel
// limited to Spark's native ones.
val negThetaArray = theta.values.map(v => math.log(1.0 - math.exp(v)))
val negTheta = new DenseMatrix(numClasses, numFeatures, negThetaArray, true)
val thetaMinusNegThetaArray = theta.values.zip(negThetaArray)
.map{case (v, nv) => v - nv}
val thetaMinusNegTheta = new DenseMatrix(
numClasses, numFeatures, thetaMinusNegThetaArray, true)
val onesVec = Vectors.dense(Array.fill(theta.numCols)(1.0))
val negThetaSum: Array[Double] = negTheta.multiply(onesVec).toArray

// Here is the prediciton functionality you need to implement - for ClassificationModels
// transform automatically wraps this - but if you might benefit from broadcasting your model or
// other optimizations you can also override transform.
def predictRaw(features: Vector): Vector = {
// Toy implementation - use BLAS or similar instead
// the summing of the three vectors but the functionality isn't exposed.
Vectors.dense(thetaMinusNegTheta.multiply(features).toArray.zip(pi.toArray)
.map{case (x, y) => x + y}.zip(negThetaSum).map{case (x, y) => x + y}
)
}
}

推荐阅读