首页 > 解决方案 > 如果我修改指向缓存 rdd 的变量会发生什么?

问题描述

我有一个缓存的 RDD,我在循环中使用了几次。在某些时候,我修改了指向所述 RDD 的变量,然后调用该变量。

var rdd1 = someRDD.chache

do{
  val rdd2 = rdd1.someLogic
  val rdd3 = rdd1.someOtherLogic
  rdd1 = rdd1.someJoin(rdd4)
  val rdd5 = rdd1.someAggregateFunction
  rdd5.first //to start the transformations
  rdd1.unpersist
} while(someCondition)

我的问题是,此时val rdd5 = rdd1.someAggregateFunction会发生什么rdd1someRDD还是rdd1.someJoin(rdd4)?我需要在某个时候取消它吗?或者如果我以后再次使用它,我是否需要再次缓存它rdd5

编辑像这样的东西

var rdd1 = sc.parallelize(Seq(1, 2, 3, 4)).cache()
rdd1.collect().foreach(println)
rdd1 = sc.parallelize(Seq(4,5,6))
rdd1.collect().foreach(println)

印刷

1
2
3
4
4
5
6

所以价值观发生了变化。它仍然留下了缓存的问题。每次更改时都需要缓存 rdd1 吗?如果是这样,我需要取消它吗?

标签: scalaapache-sparkcaching

解决方案


缓存应该像只读副本一样,不要更改缓存的 RDD。而是创建一个新的 rdd。

据我所知,首先你不应该在 do while 循环中做 unpersist。这应该在 do while 代码之后完成。也不要修改已经缓存的 RDD。我相信这就是你想要的。

var rdd1 = someRDD.chache
var changingRDD = null;
do{
  val rdd2 = rdd1.someLogic
  val rdd3 = rdd1.someOtherLogic
  changingRDD = rdd1.someJoin(rdd4)
  val rdd5 = changingRDD.someAggregateFunction
  rdd5.first //to start the transformations
} while(someCondition)
rdd1.unpersist

推荐阅读