scala - 在 SparkML Transformer 中缓存数据集
问题描述
我正在尝试在管道中使用Spark ML Transformers 。此管道中的一项任务是使用键将传入数据集连接到现有参考数据join
。
参考数据很大,但是可以在join
key上进行预分区。如果我想多次使用这个转换器,但只想将参考数据加载到内存中一次,那么在转换器的生命周期中,我可以缓存参考数据吗?(可以假设参考数据在某处的镶木地板文件中)
解决方案
首先,确保您确实需要在 Spark 层进行缓存。读取 Parquet 速度很快,网络文件系统速度很快,并且操作系统缓冲区缓存非常大。根据您的环境和工作集大小,您可能不需要在 Spark 层进行缓存(这样做甚至可能会损害性能)。
如果您确定通常缓存是有意义的,那么问题就有点棘手了,因为即使有人向您传递了已经缓存的东西,您也想做正确的事情(在这种情况下,您不需要缓存它并且不会'不想在将来的某个时候取消缓存它)。您可以在缓存之前检查数据集是否已缓存:
if (refdata.storageLevel == StorageLevel.NONE) refdata.cache()
以这种方式有条件地缓存引用数据适用于实际构建Transformer
,因为在cache
访问引用数据之前它不会起作用。
何时取消缓存数据是一个棘手的问题。如果有人(甚至是您)向您传递了缓存的参考数据,那么您就没有缓存它(并且您以后不想取消缓存它,以免让他们感到惊讶)。如果您跟踪是否将参考数据缓存在您的Transformer
(例如,在名为 的值中uncached
),那么您应该有一个在必要时进行清理的方法,并在完成后显式调用它。
把它们放在一起,你的 Transformer 看起来像这样:
class ExampleModel(private val uncached: Boolean, private val refdata: Broadcast[DataFrame]) extends Model[Example] {
def this(df: Broadcast[DataFrame]) {
this(df.value.storageLevel == StorageLevel.NONE, df)
}
if(uncached) refdata.value.cache();
// ...
def cleanup { if (uncached) { refdata.value.uncache() } ; refdata.destroy() }
}
推荐阅读
- tensorflow - TypeError:无法将提供的值转换为 EagerTensor。提供的值:0.0 请求的 dtype:int64
- amazon-web-services - 如何解决:'无法解析要使用的 AWS 账户。它必须在定义 CDK 时或通过环境进行配置'
- java - 如何对在具有未知对象参数的构造函数的类中工作的方法进行单元测试(Junit 4)
- python - Pandas 重命名特定列并更改 dtype
- css - 使用 calc() 以分数形式获取宽度的表达式
- deep-learning - ValueError: test_size=0 应该是正数且小于样本数 0 或 (0,1) 范围内的浮点数
- java - 回退到默认序列化的自定义序列化程序
- python - 使用 tkinter 创建一个简单的 GUI,并且有一个小问题
- sql - 返回有限记录的 SQL 语句
- php - PayPal Checkout : LiveEnvironment class not found