仿站模板,注册安全工程师建设工程网站,潍坊网站建设哪家专业,有没有医学生做课件的网站最近在看黑马视频学习CAS相关的知识#xff0c;刚好也在学Spark#xff0c;凑一起学了。类中几个核心的方法#xff1a;CAS#xff08;Compare-And-Swap#xff09;是一种重要的无锁#xff08;lock-free#xff09;并发编程思想#xff0c;它在高性能框架 like Spark …最近在看黑马视频学习CAS相关的知识刚好也在学Spark凑一起学了。类中几个核心的方法CASCompare-And-Swap是一种重要的无锁lock-free并发编程思想它在高性能框架 like Spark 中无处不在。Spark 在其核心代码中大量使用了 Java 并发包java.util.concurrent中的原子变量Atomic Variables这些原子变量的底层实现正是基于 CPU 的 CAS 指令。阅读 Spark 源码中几个体现 CAS 思想的典型例子。1. 核心概念Java 原子变量Spark 中 CAS 思想最直接的体现就是广泛使用 java.util.concurrent.atomic包下的类例如AtomicReferenceAtomicIntegerAtomicLongAtomicBoolean这些类的 compareAndSet()、getAndSet()、getAndIncrement()等方法都是基于 CAS 实现的。2. 源码实例分析实例 1RPC 端点状态控制 (RpcEndpoint.scala)在 org.apache.spark.rpc包中RpcEndpoint的生命周期状态管理就使用了 CAS 来确保线程安全。核心思想 一个 RpcEndpoint 的状态如stopped可能会被多个线程同时修改例如同时调用 stop()方法但必须保证它只能被成功停止一次。简化代码逻辑
private val stopped new AtomicBoolean(false)def stop(): Unit {// 使用 CAS 操作来尝试将状态从 false 改为 true// 如果成功说明我是第一个执行 stop 的线程可以执行关闭逻辑// 如果失败说明其他线程已经执行了 stop我直接返回即可if (stopped.compareAndSet(false, true)) {// 执行真正的资源释放和关闭逻辑...doRealStop()}
}源码位置 你可以查看 org.apache.spark.rpc.RpcEndpoint相关的实现类状态控制通常采用这种模式。实例 2 工具类中的等待机制 (Utils.scala)在 org.apache.spark.util.Utils中有一个方法 waitUntil它使用 CAS 和自旋循环来等待某个条件被满足。核心思想 不断检查一个条件直到它成立。使用 volatile 变量保证可见性循环检查而非锁阻塞效率更高。简化代码逻辑
def waitUntil(p: Boolean, timeout: Long 100, maxTries: Int 600): Boolean {var tries 0while (!p tries maxTries) {Thread.sleep(timeout)tries 1 // 这个递增操作本身不是原子的但这里循环条件不依赖于它所以没问题}p
}虽然这个例子中没有直接的 AtomicInteger但它体现了 CAS 的“循环尝试”核心思想。更复杂的版本可能会用 AtomicInteger来记录 tries。源码位置 org.apache.spark.util.Utils#waitUntil实例 3 事件循环总线 (LiveListenerBus.scala)Spark 的事件监听总线 LiveListenerBus用于异步传递各种事件如任务开始、结束等。它的 started状态也使用 CAS 来管理。核心思想 确保监听总线只能被启动一次。简化代码逻辑
private val started new AtomicBoolean(false)def start() {// 尝试将 started 从 false 设置为 trueif (started.compareAndSet(false, true)) {// 成功执行启动逻辑thread.start()} else {// 失败说明已经启动抛出异常或忽略logError(LiveListenerBus already started!)}
}源码位置 org.apache.spark.scheduler.LiveListenerBus#start实例 4 任务内存管理 (TaskMemoryManager.scala)这是最复杂也是最能体现 CAS 优势的例子。在 org.apache.spark.memory.TaskMemoryManager中管理着每个 Task 的内存分配。其中有一个核心结构叫 PageTable它使用 AtomicReferenceArray来存储内存页MemoryBlock。核心思想 多个线程可能同时为同一个 Task 申请或释放内存页。使用 CAS 可以无锁地管理页表的分配和释放极大提升了并发性能。如果使用传统的锁内存分配将成为巨大的性能瓶颈。简化代码逻辑
// 页表是一个原子引用数组
private val pageTable new AtomicReferenceArray[MemoryBlock](maxPages)// 分配一个新页的近似逻辑
def allocatePage(): MemoryBlock {var pageNumber: Int -1// 循环尝试找到一个空位do {val foundPage findFreePage() // 寻找一个空页号if (foundPage -1) {return null // 没找到}// 关键使用 CAS 操作尝试将 pageTable 中 foundPage 位置从 null 设置为一个“占位符”// 如果成功说明这个页被我抢到了如果失败说明被其他线程抢走了继续循环寻找} while (!pageTable.compareAndSet(foundPage, null, placeholder))// 成功抢到页执行实际的内存分配...val page allocRealMemory()// 最后将占位符替换为真正的内存页对象pageTable.set(foundPage, page)page
}源码位置 org.apache.spark.memory.TaskMemoryManager。这里的代码非常复杂但 AtomicReferenceArray.compareAndSet是其无锁化的基石。总结在 Spark 源码中CAS 思想主要体现在以下几个方面状态标志位管理 例如组件RpcEndpoint, ListenerBus的启动、停止状态。使用 AtomicBoolean.compareAndSet确保状态转换的原子性和线程安全避免重复启动/关闭。无锁计数器和累加器 使用 AtomicInteger.getAndIncrement等进行计数避免 synchronized带来的性能开销。无锁数据结构 最典型的例子是 TaskMemoryManager中的页表管理使用 AtomicReferenceArray实现高性能的无锁内存分配这是支撑 Spark 高效内存计算的关键之一。乐观锁控制 广泛采用“循环尝试”的模式先读取值计算新值然后通过 CAS 更新。如果失败说明期间值被其他线程修改则回滚并重试。Spark 是一个高性能、高并发的分布式计算框架。在分布式环境下锁synchronized容易导致线程阻塞和死锁成为性能瓶颈。而 CAS 是一种乐观锁它假设竞争不激烈直接尝试更新如果失败就重试。在大多数情况下这种无锁操作的速度远快于锁操作从而极大地提升了框架的并发性能和吞吐量。这些代码可以在 Spark 源码项目中搜索 Atomic、compareAndSet等关键字会发现大量的应用场景。