您的当前位置:首页Spark1.6.3 cache()和persist()

Spark1.6.3 cache()和persist()

2024-12-13 来源:哗拓教育

RDD的持久化也就是说假如我们从hdfs读取文件,形成RDD。当我们对RDD进行持久化操作之后,
,然后再针对该RDD进行action操作(这里我们假设执行count操作,中间可能经历了一系列transformation操作),虽然第一次count()操作执行完了,但是也不会清除掉RDD中的数据,而是将其缓存在内存或者磁盘上。当第二次再执行count操作时,就不会重新从hdfs上读取数据,形成新的RDD,而是直接从RDD所在的所有节点的缓存中直接读取,对数据直接执行count操作,避免了重复计算。所以如果我们合理使用RDD的持久化机制,可以有效提高应用程序的性能。

从源码来看调用cache()方法,内部调用persist()方法 :

 def cache(): this.type = persist()

persist()方法的默认的sotrageLevel 是MEMORY_ONLY,所以cache()方法默认使用内存缓存。

  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  def persist(newLevel: StorageLevel): this.type = {
 // 如果用户设置了checkpoint,我们需要覆盖旧的storage level。
 //   checkpoint机制会将RDD的数据容错到文件系统上(比如说
hdfs),如果在对RDD进行计算的时候,发现存储的数据不在了,
会先找一下checkpoint数据,如果有就是用checkpoint的数据,就不
需要去计算了。
   if (isLocallyCheckpointed){
     persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    }else{
      persist(newLevel, allowOverride = false)
   }
显示全文