asp源代码网站,北京营销推广网站建设,长尾关键词,s上海网站建设目录
1.概论
1.1.实现锁的要素
1.2.阻塞队列
1.3.Lock接口和Sync类
2.各种锁
2.1.互斥锁
2.1.1.概论
2.1.2.源码
1.lock()
2.unlock()
2.2.读写锁
2.3.Condition
2.3.1.概论
2.3.2.底层实现 1.概论
1.1.实现锁的要素
JAVA中的锁都是可重入的锁#xff0c;因为…
目录
1.概论
1.1.实现锁的要素
1.2.阻塞队列
1.3.Lock接口和Sync类
2.各种锁
2.1.互斥锁
2.1.1.概论
2.1.2.源码
1.lock()
2.unlock()
2.2.读写锁
2.3.Condition
2.3.1.概论
2.3.2.底层实现 1.概论
1.1.实现锁的要素
JAVA中的锁都是可重入的锁因为不可重入的试用的时候很容易造成死锁。这个道理很好想明白
当一个线程已经持有一个锁并在持有该锁的过程中再次尝试获取同一把锁时如果没有重入机制第二次请求会被阻塞因为锁已经被自己持有。这会导致线程自我死锁因为它在等待自己释放的锁。
可重入是指获取锁的线程可以继续重复的获得此锁。其实我们想都能想到要实现一把锁需要些什么首先肯定是 标志位也叫信号量标记锁的状态和重入次数这样才能完成持有锁和释放锁。
接下来要考虑的是拒接策略当前锁被持有期间后续的请求线程该怎么处理当然可以直接拒绝JAVA的选择委婉点选择了允许这些线程躺在锁上阻塞等待锁被释放。要实现让线程躺在锁上等待我们想想无非要 需要支持对一个线程的阻塞、唤醒 需要记录当前哪个线程持有锁 需要一个队列维护所有阻塞在当前锁上的线程
OK以上四点就是JAVA锁的核心总结起来就是信号量队列分别用来记录持有者和等待者。
1.2.阻塞、唤醒操作
首先我们来看看阻塞和唤醒的操作在JDK中提供了一个Unsafe类该类中提供了阻塞或唤醒线程的一对操作 原语——park/unpark
public native void unpark(Object var1);
public native void park(boolean var1, long var2);
这对原语最终会调用操作系统的程序接口执行线程操作。
1.2.阻塞队列
拿来维护所有阻塞在当前锁上的线程的队列能是个普通队列吗很显然不是它的操作必须是线程安全的是吧所以这个队列用阻塞队列实现才合适。什么是阻塞队列
阻塞队列提供了线程安全的元素插入和移除操作并且在特定条件下会阻塞线程直到满足操作条件。
说到JDK中的阻塞队列其核心就是AbstractQueuedSynchronizer简称AQS由双向链表实现的一个元素操作绝对安全的队列用来在锁的实现中维护阻塞在锁上的线程上的队列的这个角色。
来看看AQS的源码
它有指向前后节点的指针、有一个标志位state、还有一个提供线程操作原原语阻塞、唤醒的unsafe类。 所以其实AQS就长这样 点进源码可以看到其随便一个方法都是线程安全的 由于本文不是专门聊AQS这里就不扩展了反正知道AQS是一个线程安全的阻塞队列就对了。
1.3.Lock接口和Sync类
JAVA中所有锁的顶级父接口用来规范定义一把锁应该有那些行为职责
public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock(long time, TimeUnit unit) throws InterruptedException;void unlock();Condition newCondition();
}
JAVA中所有锁的实现都是依托AQS去作为阻塞队列每个锁内部都会实现一个Sync内部类在自身Sync内部以不同的策略去操作AQS实现不同种类的锁。
abstract static class Sync extends AbstractQueuedSynchronizer {......}
2.各种锁
2.1.互斥锁
2.1.1.概论
ReentrantLock互斥锁ReentrantLock本身没有任何代码逻辑依靠内部类Sync干活儿
public class ReentrantLock implements Lock, Serializable {private final ReentrantLock.Sync sync;public void lock() {this.sync.lock();}public void unlock() {this.sync.release(1);}......
}
ReentrantLock的Sync继承了AQS
abstract static class Sync extends AbstractQueuedSynchronizer {......}
Sync是抽象类有两个实现 NonfairSync公平锁 FairSync非公平锁
实例化ReentrantLock的实例时根据传入的标志位可以创建公平和公平的实现
public class ReentrantLock implements Lock, java.io.Serializable{
public ReentrantLock() {sync new NonfairSync();}
public ReentrantLock(boolean fair) {sync fair ? new FairSync() : new NonfairSync();}......
}
}
2.1.2.源码
1.lock()
公平锁的lock()
static final class FairSync extends Sync {final void lock() {acquire(1);//进来直接排队}
非公平锁的lock():
static final class NonfairSync extends Sync {final void lock() {if (compareAndSetState(0, 1))//进来直接抢锁setExclusiveOwnerThread(Thread.currentThread());//将锁的持有者设置为当前线程elseacquire(1);//没抢过再去排队}}
acquire是AQS的模板方法
tryAcquire尝试再去获取一次锁公平锁依然是排队抢去看看阻塞队列是否为空非公平锁依然是直接抢。
acquireQueued将线程放入阻塞队列。
public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
acquireQueued..是lock()最关键的一部分addWaiter..把Thread对象加入阻塞队列acquireQueued..完成对线程的阻塞。
final boolean acquireQueued(final Node node, int arg) {boolean failed true;try {boolean interrupted false;for (;;) {final Node p node.predecessor();if (p head tryAcquire(arg)) {//如果发现自己在队头就去拿锁setHead(node);p.next null; // help GCfailed false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())//调用原语阻塞自己interrupted true;}} finally {if (failed)cancelAcquire(node);}}
acquireQueued..函数有一个返回值表示什么意思 呢虽然该函数不会中断响应但它会记录被阻塞期间有没有其他线 程向它发送过中断信号。如果有则该函数会返回true否则返回false。所以才有了以下逻辑
public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
当 acquireQueued.. 返回 true 时会调用 selfInterrupt 自己给自己发送中断信号也就是自己把自己的中断标志位设 为true。之所以要这么做是因为自己在阻塞期间收到其他线程中 断信号没有及时响应现在要进行补偿。这样一来如果该线程在loc k代码块内部有调用sleep之类的阻塞方法就可以抛出异常响 应该中断信号。
2.unlock()
unlock的逻辑很简单每次unlockstate-1直到state0时将锁的拥有者置null释放锁。由于只有锁的持有线程才能操作lock所以unlock()不需要用CAS操作时直接判断一下是不是锁的持有线程在操作即可。
public void unlock() {sync.release(1);}
public final boolean release(int arg) {if (tryRelease(arg)) {//释放锁Node h head;if (h ! null h.waitStatus ! 0)unparkSuccessor(h);//唤醒阻塞队列中的后继者return true;}return false;}
释放锁
protected final boolean tryRelease(int releases) {int c getState() - releases;//每次unlockstate减1if (Thread.currentThread() ! getExclusiveOwnerThread())//判断是不是锁的持有线程throw new IllegalMonitorStateException();boolean free false;if (c 0) {//state为0表示该锁没有被持有free true;setExclusiveOwnerThread(null);//将锁的持有者置null}setState(c);return free;}
唤醒后继者
private void unparkSuccessor(Node node) {int ws node.waitStatus;if (ws 0)compareAndSetWaitStatus(node, ws, 0);Node s node.next;if (s null || s.waitStatus 0) {s null;for (Node t tail; t ! null t ! node; t t.prev)if (t.waitStatus 0)s t;}if (s ! null)LockSupport.unpark(s.thread);} 2.2.读写锁
读写锁是一个实现读写互斥的锁读写锁包含一个读锁、一个写锁
public interface ReadWriteLock{Lock readLock();Lock writeLock();
}
读写锁的使用就是直接调用对应锁进行锁定和解锁
ReadWriteLock rwLocknew ReetrantReadWriteLock();
Lock rLockrwLock.readLock();
rLock.lock();
rLock.unLock();
Lock wLockrwLock.writeLock();
wLock.lock();
wLock.unLock();
读写锁的Sync内部类对读锁和写锁采用同一个int型的信号量的高16位和低16位分别表示读写锁的状态和重入次数这样一次CAS就能统一处理进行读写互斥操作
abstract static class Sync extends AbstractQueuedSynchronizer {static final int SHARED_SHIFT 16;static final int SHARED_UNIT (1 SHARED_SHIFT);static final int MAX_COUNT (1 SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK (1 SHARED_SHIFT) - 1;static int sharedCount(int c) { return c SHARED_SHIFT; }static int exclusiveCount(int c) { return c EXCLUSIVE_MASK; }
}
2.3.Condition
2.3.1.概论
condition用于更加细粒度的控制锁上面的线程阻塞、唤醒。
以下以一个经典的生产、消费者问题为例
队列空的时候进来的消费者线程阻塞有数据放进来后唤醒阻塞的消费者线程。
队列满的时候进来的生产者线程阻塞有空位后唤醒阻塞的生产者线程。 锁粒度的实现
public void enqueue(){synchronized(queue){while(queue.full()){queue.wait();}//入队列......//通知消费者队列中有数据了queue.notify();}
}
public void dequeue(){synchronized(queue){while(queue.empty()){queue.wait();}//出队列......//通知生产者队列中有空位了可以继续放数据queue.notify();}
}
可以发现唤醒的时候把阻塞的生产消费线程一起唤醒了。
条件粒度的实现
private final Lock lock new ReentrantLock();
private final Condition notFull lock.newCondition(); // 用于等待队列不满
private final Condition notEmpty lock.newCondition(); // 用于等待队列非空public void enqueue(Object item) {try {while (queue.isFull()) {notFull.await(); // 等待队列不满}// 入队列操作// ...// 入队后通知等待的消费者notEmpty.signal();} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 保持中断状态// 处理中断逻辑} finally {queue.unlock();}
}public void dequeue() {try {while (queue.isEmpty()) {notEmpty.await(); // 等待队列非空}// 出队列操作// ...// 出队后通知等待的生产者notFull.signal();} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 保持中断状态// 处理中断逻辑} finally {queue.unlock();}
} 2.3.2.底层实现
Condition由Lock产生因此Lock中持有Condition
public interface Lock {......Condition newCondition();
}
承担功能的其实就是Syn中的ConditionObject也就是AQS中的ConditionObject
final ConditionObject newCondition() {return new ConditionObject(this);}
一个Condition上面阻塞着多个线程所以每个Condition内部都有一个队列用来记录阻塞在这个condition上面的线程这个队列其实也是AQS实现的AQS中除了实现一个以Node为节点的队列还实现了一个以ConditionObject为节点的队列
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID 1173984872572414699L;private transient Node firstWaiter;private transient Node lastWaiter;......}}
Condition是个接口定义了一系列条件操作
public interface Condition {void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long var1) throws InterruptedException;boolean await(long var1, TimeUnit var3) throws InterruptedException;boolean awaitUntil(Date var1) throws InterruptedException;void signal();void signalAll();
}