首页 > 解决方案 > 在 SparkML Transformer 中缓存数据集

问题描述

我正在尝试在管道中使用Spark ML Transformers 。此管道中的一项任务是使用键将传入数据集连接到现有参考数据join

参考数据很大,但是可以在joinkey上进行预分区。如果我想多次使用这个转换器,但只想将参考数据加载到内存中一次,那么在转换器的生命周期中,我可以缓存参考数据吗?(可以假设参考数据在某处的镶木地板文件中)

标签: scalaapache-sparkmachine-learningapache-spark-ml

解决方案


首先,确保您确实需要在 Spark 层进行缓存。读取 Parquet 速度很快,网络文件系统速度很快,并且操作系统缓冲区缓存非常大。根据您的环境和工作集大小,您可能不需要在 Spark 层进行缓存(这样做甚至可能会损害性能)。

如果您确定通常缓存是有意义的,那么问题就有点棘手了,因为即使有人向您传递了已经缓存的东西,您也想做正确的事情(在这种情况下,您不需要缓存它并且不会'不想在将来的某个时候取消缓存它)。您可以在缓存之前检查数据集是否已缓存:

if (refdata.storageLevel == StorageLevel.NONE) refdata.cache()

以这种方式有条件地缓存引用数据适用于实际构建Transformer,因为在cache访问引用数据之前它不会起作用。

何时取消缓存数据是一个棘手的问题。如果有人(甚至是您)向您传递了缓存的参考数据,那么您就没有缓存它(并且您以后不想取消缓存它,以免让他们感到惊讶)。如果您跟踪是否将参考数据缓存在您的Transformer(例如,在名为 的值中uncached),那么您应该有一个在必要时进行清理的方法,并在完成后显式调用它。

把它们放在一起,你的 Transformer 看起来像这样:

class ExampleModel(private val uncached: Boolean, private val refdata: Broadcast[DataFrame]) extends Model[Example] {

  def this(df: Broadcast[DataFrame]) {
    this(df.value.storageLevel == StorageLevel.NONE, df)
  }

  if(uncached) refdata.value.cache();

  // ...

  def cleanup { if (uncached) { refdata.value.uncache() } ; refdata.destroy() }

}

推荐阅读