自学内容网 自学内容网

Spark 中 cache、persist 和 checkpoint 优化数据处理的三种重要机制介绍

在 Spark 中,cachepersistcheckpoint 是优化数据处理的三种重要机制。它们都旨在减少数据重算和优化性能,但有各自的应用场景和实现原理。下面从源码角度分析其原理、作用和适用场景。


1. 基本概念和作用

机制作用存储介质
cache将数据存储在内存中以加快后续计算速度。默认存储在内存
persist提供多种存储级别(如内存和磁盘),支持灵活选择存储策略。内存、磁盘、堆外内存等
checkpoint将数据保存到可靠的存储系统(如 HDFS),提供容错能力,打断 DAG 依赖链。HDFS 或其他持久化存储

2. 核心原理

2.1 Cache

cachepersist 的简化版,其底层实现直接调用 persist(StorageLevel.MEMORY_AND_DISK),默认将数据存储在内存中,如果内存不足,则溢写到磁盘。

源码分析

  • 在 RDD 中,cache() 的代码:
    def cache(): this.type = persist(StorageLevel.MEMORY_AND_DISK)
    
  • persist 方法核心逻辑:
    def persist(newLevel: StorageLevel): this.type = {
      if (storageLevel != StorageLevel.NONE && storageLevel != newLevel) {
        throw new UnsupportedOperationException("Cannot change storage level...")
      }
      storageLevel = newLevel
      this
    }
    
  • 执行时,RDD 的 computeOrReadCheckpoint 方法判断是否已经缓存:
    if (isCached) {
      SparkEnv.get.blockManager.getOrElseUpdate(blockId, ...)
    } else {
      compute(split, context)
    }
    

作用

  • 加速重复计算:避免重复计算 DAG 中的父节点。
  • 默认存储级别为 MEMORY_AND_DISK,当内存不足时,溢写磁盘。

适用场景

  • 数据需要被多次使用,但不需要跨作业的容错能力。
  • 计算代价大,但内存能够容纳数据。

2.2 Persist

persistcache 的增强版,允许用户选择存储级别(StorageLevel),如:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • DISK_ONLY
  • 堆外内存、序列化存储等。

源码分析

  • StorageLevel 是一个枚举类,定义了各种存储级别:
    case class StorageLevel(
      useDisk: Boolean,
      useMemory: Boolean,
      useOffHeap: Boolean,
      deserialized: Boolean,
      replication: Int
    )
    
  • persist 方法直接调用 BlockManager 存储数据,核心逻辑:
    blockManager.putIterator(
      blockId,
      values,
      level,
      tellMaster = true
    )
    

作用

  • 提供更灵活的存储策略,适应内存、磁盘等不同环境。

适用场景

  • 数据较大,内存无法完全容纳,需要存储到磁盘或其他媒介。
  • 数据跨作业使用时(需确保存储级别满足作业要求)。

2.3 Checkpoint

checkpoint 会将 RDD 的数据保存到可靠存储(如 HDFS),并将 RDD 的依赖链打断,从而减少 DAG 深度,增强容错能力。

源码分析

  • RDDcheckpoint 方法:
    def checkpoint(): Unit = synchronized {
      if (doCheckpoint()) { // 检查是否需要 checkpoint
        val newRDD = new CheckpointRDD(this)
        this.rdd = newRDD // 更新依赖为 CheckpointRDD
      }
    }
    
  • CheckpointRDD 会从持久化存储中加载数据:
    override def compute(split: Partition, context: TaskContext): Iterator[T] = {
      val path = getCheckpointPath(split)
      val data = loadFromHDFS(path)
      data.iterator
    }
    

作用

  • 容错:数据保存到可靠存储中。
  • 优化 DAG:打断长依赖链,减少重算开销。

适用场景

  • 作业链较长,DAG 深度过大,容易导致重算开销。
  • 需要跨作业使用 RDD 数据,且要求数据容错性强。

3. 使用对比

特点CachePersistCheckpoint
存储位置内存(默认)或磁盘溢写多种存储级别可靠存储系统(如 HDFS)
容错性低,数据丢失需重算低至中,取决于存储级别高,数据可靠存储
DAG 优化有,打断依赖链
开销较低高(需要持久化和 I/O 操作)

4. 使用场景总结

Cache
  • 数据需要被频繁多次使用,且内存能够容纳。
  • 例如:在机器学习中对训练数据进行多次迭代。
Persist
  • 数据规模较大,内存无法完全容纳,需结合磁盘。
  • 例如:图计算中存储中间结果,避免重复计算。
Checkpoint
  • 作业链较长,可能因 DAG 深度导致失败或性能下降。
  • 需要跨作业的容错能力。
  • 例如:深度学习中的训练数据预处理、长链条依赖的 ETL 作业。

5. 综合优化建议

  • 优先考虑 cachepersist:仅当 DAG 深度问题显著时,使用 checkpoint
  • 设置合理的存储级别:根据内存和磁盘资源选择最优存储策略。
  • 结合 checkpointpersist:在 Checkpoint 前对数据 Persist,避免重新计算数据。

原文地址:https://blog.csdn.net/z1941563559/article/details/143894377

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!