AQS实现原理
AQS实现原理
在jdk 1.6之前 synchronized 这个重量级锁性能一直较为低下, 在1.6之后,进行了大量的锁优化,如偏向锁,自旋锁,但是与lock相比,synchronized 还是存在以下缺陷,缺少了获取锁的可操作性,可中断,超时获取锁,且他为独占式在高并发场景下性能大打折扣。
使用了Lock,必然离不开核心组件AQS
什么是AQS呢?
AQS(AbstractQueuedSynchronizer )即队列同步器,他是构建锁或者其他同步组件的基础框架,如ReentrantLock、ReentrantReadWriteLcok、Semaphore、countDownLacth等
AQS是一个抽象类,主要是通过继承方式使用,本身没有实现任何借口,仅仅是定义了同步状态的获取和释放 的方法。AQS解决了类实现同步器的大量细节问题,例如获取同步状态,FIFO队列,入队和出队/自定义同步器在实现的时候只需要实现共享资源state的获取和释放即可,至于获取资源失败入队/唤醒出队等,AQS在顶层已经定义好了。
AQS中关于state的定义有三种
getState():返回同步状态的当前值。
setSate(int newState)设置当前同步状态
compareAndSetState(int expect ,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子执行
自定义同步器实现时主要实现以下几种方法
第一类:子类实现的方法,AQS不作处理(模版方法)
tryAcquire(int arg) 独占获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态。
tryRelease(int arg) 独占式释放同步状态
tryAcquireShared(int arg) 共享式获取同步状态,返回值大于等于0 则表示获取成功,否则获取实现。
tryReleaseShared(int arg) 共享式释放同步状态
isHeldExclusively 当前同步器是否在独占模式下被线程占用,一般该方法表示同步器是否被当前线程独占。
第二类:AQS本身的实现的方法,定义给子类通用实现的方法。
aquire(int arg) 独占式的获取锁的操作,独占式获取同步状态都调用的方法,通过子类的tryAquire 方法判断是否获取到锁。
acquireShared(int arg )共享式的获取锁的操作,在读写所中用到,通过tryAquireShared方法判断是否获取同步状态。
release(int args) 独占式的释放同步状态 ,通过tryRelease方法判断是否释放了独占式同步状态
releaseShard(int arg) 共享式的释放同步状态,通过tryReleaseShared 方法判断是否已经释放了共享同步状态
AQS的两种功能
从使用层面来说,AQS功能分为两种:独占和共享
独占锁,每次只能一个线程池有锁,比如ReentrantLock就是独占锁
共享锁,允许多个线程池有锁,并发访问共享资源,比如ReentrantREadWriteLock
共享锁和独占锁的释放有一定的区别,前面部分是一致的,先判断头节点是不是signal状态,如果是则唤醒头节点的下一个节点,并将该节点设置为头节点。而共享锁不一样,某个节点被设置为head之后,如果他的后继节点是shared状态,那么会尝试使用doReleaseShared方法尝试唤醒和节点,实现共享状态的传播。
AQS内部实现
AQS是依赖内部的同步队列实现,也就是FIFO双向队列,如果当前线程竞争锁失败,那么AQS会把当前线程已经等待状态封装成一个Node节点, 加入到同步队列中,同时阻塞该线程,当同步状态释放时,会把首节点唤醒,使其在此尝试获取同步状态。
AQS队列内部维护的是一个双向链表,这种结构每个数据都有两个指针,分别只想直接的钱去节点和后继节点,当线程抢占锁失败的时候,会封装成Node加入到AQS中去。
在同步队列中,一个节点表示一个线程,他保存这个线程的引用ThreadId,状态(watiStatus) ,前驱节点(pre),后继节点(next),其数据结构如下,
节点状态(waitStatus)
每个节点包含了线程的等待状态,是否被阻塞,是否等待唤醒,是否被取消。变量waitStatus则表示当前节点Node节点的等待状态,共有5种取值,
cancelled,signal,condition,propagate,0
cancelled(1) 表示当前节点已经取消调度。 当timeout 或者中断的情况下。
signal(-1)表示当前节点释放锁的时候,需要唤醒下一个节点,或者说后继节点等待当前节点唤醒,候机节点入队的时候,会将前驱节点更新给singnal
condition(-2)当其他线程调用了condition的signal 方法后,condition状态的节点会从等待队列转移到同步队列中,等待获取同步锁。
propagate(-3)共享模式下,前驱节点不仅会唤醒其他后继节点,同时也可能唤醒后继的后继节点
0 新节点入队的时候的默认状态
添加节点addWatier
入队操作就是tail指向新节点,新节点的前驱节点pre指向之前当前的最后的节点,当前最后的节点next指向新节点,相关操作在addWaiter方法里
private Node addWaiter(Node mode){
// - 根据给定的模式(独占或者共享) 新建Node
Node node = new Node(Thread.currentThread(),mode);
// - 快速尝试添加尾节点
Node pred = tail;
if(pred != null){
node.prev = pred;
// - CAS 设置尾节点
if(compareAndSetTail(pred,node)){
pred.next = node;
reture node;
}
}
// - 多次尝试
enq(node);
return node;
}
addWaiter(Node node) 先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点
private Node enq (final Node node){
// - 多次尝试 直到成功为止
for(;;){
Node t = tail;
// - tail 不存在,设置为首节点
if(t == null){
if(compareAndSetHead(new Node())){
tail = head;
}
} else {
// - 设置尾节点
node.prev = t;
if(compareAndSetTail(t,node)){
t.next = node;
return t;
}
}
}
}
此方法用于将Node 加入队尾,核心就是通过CAS自旋的方式设置尾节点。假如有两个线程t1, t2,同时进入enq方法,t ==null 表示队列首次使用,需要先初始化,另一个线程cas失败,则进入下次循环,同归哦cas操作将node添加到队尾。
同步状态的获取和释放
独占式同步状态获取
acquire(int arg) :独占式的获取锁,此方法不响应中断,在这个过程中中断,线程不会从同步队列中移除,也不会立马中断,在整个过程结束后再自我中断
public final void acquire(int arg){
if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE))){
selfInterrupt();
}
}
tryAcquire:去尝试获取锁,获取成功后设置锁的状态并返回trure,否则直接返回false。该方法由自定义的同步组建实现,该方法的实现必须保证线程安全的获取同步状态。
addWaiter如果tryAcquire返回false(获取锁失败)则调用该方法将当前线程封装成节点加入到同步队列尾部,并标记为独占模式。addWaiter(Node.EXCLUSIVE),Node.EXCLUSIVE即为独占模式
acquiredQureued当前线程获取失败后,就进入了一个自选的状态,如果在等待过程中被中断过(如timeout),就返回true(接着自我中断),否则返回false,也就是说当前线程进入同步队列后自选状态,每个节点会自我观察,当条件满足时,获取到同步状态后,就可以从这个自旋过程中退出,否则一直执行下去。
如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。因为在等待队列中自旋状态的线程是不会响应中断的,它会把中断记录下来。如果在自旋时发生过中断,就返回trure,然后就会执行selfInterrupt()方法。而这个方法就是简单的中断当前线程Thread.currentThread().interrupt();其作用就是补上在自旋时没有响应的中断。
tryAcquire 在NonfairSync的实现
final boolean nonfairTryAcquire(int acquires){
final Thread current = Thread.currentThread();
// - 获取锁同步状态
int c = getState();
if(c == 0){ // - 0 表示无锁状态
// cas竞争锁,替换state 的值改为1
if(compareAndSetState(0,acquires)){
// - 保存当前获得锁的线程
setExclusiveOwnerThread(current);
reture ture;
}
}
// - 如果是同一个线程来获得锁,则直接增加冲入次数
else if (current == getExclusiveOwnerThread()){
int nextc = c + acquires;// 增啊急重入次数
if(nextc < 0 )
throw new Error("jMaximum lock count exceeded");
setState(nextc);
reutrn trure;
}
return false;
}
acquiredQueued 的实现
前线程进入同步队列后进入自旋状态,每个节点会自我观察,当条件满足时,获取到同步状态后,就可以从这个自旋过程中退出(返回interrupted为false),否则一直执行下去,也不是每一个节点都有获取锁的资格,因为是FIFO的先进先出的队列,aqquireQueued方法保证了只有头部节点的后继节点才有资格去获取同步状态。
final boolean acquireQuueued(final Node node,int arg){
// - 标记是否成功拿到资源
boolean failed = true;
try{
// - 中断标志
boolean interrupted = false;
// - 自旋,一个死循环
for(;;){
// - 获取当前线程的前驱节点
final Node p = node.predecessor();
// - 当前线程的前驱节点是头节点,即改节点是第二个节点,且获取锁成功
if(p == head && tryAcquire(arg)){
// - 将head指向该节点
setHead(Node);
// - 方便垃圾回收
p.next = null;
failed = false;
// - 返回等待过程中是否被中断过
return interrupted;
}
// - 获取失败,就需要阻塞了,线程就进入waiting状态,直到被unpark();
if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
// - 如果等待过程中被中断过一次,就标记为true;
interrupted = trure;
} finally {
if(failed){
cancelAcquire(node);
}
}
}
}
shouldParkAfterFailedAcquire(Node pred,Node node);判断一个线程是否阻塞
private static boolean shouldParkAfterFailledAcquire(Node pred,Node node){
int ws = pred.waitStatus;// 拿到前驱节点的状态
if(ws == Node.SIGNAL){
// 状态为SIGNAL,如果前驱节点处于等待状态,直接返回true
reture true;
}
if(ws > 0){
// - 如果前驱节点放弃了,那就直接往前找,直到找到最近一个正常等待的状态,并排在它的后边
// - 注意:那些放弃的节点,由于被自己“加塞”到它们的前边,它们相当于形成一个无引用链,稍后就会被GC回收,这个操作实际是把队列中的cancelled节点剔除掉
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// - 如果前驱节点正常,那就把前驱的状态通过CAS 的方式设置成SIGNAL
compareAndSetWaitStatus(pred,was,Node.SIGNAL);
}
return false;
}
1、如果当前线程的前驱节点状态为SIGNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回ture,当前线程阻塞
2、如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <=0,返回false
3、如果前驱节点飞SIGNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SIGNAL,返回false
整个流程中,如果前驱节点的状态不是SIGNAL,那么自己就不能安心的去休息,需要去找一个安心的休息点(前驱节点状态<=0),也就是只有当前前驱节点为SIGNAL时这个线程才能进入等待状态。shouldParkAfterFailedAcquire -> 检查上一个节点状态,如果是SIGNAL就阻塞,否则就改成SIGNAL
上面如果shouldParkAfterFailedAcquire(Node pred,Node node)方法返回true,则调用parkAndCheckINterript()方法阻塞当前线程
parkAndCheckInterript()阻塞当前线程
前面的放啊放是否是阻塞,而这个方式就是真正的执行阻塞的方法同时返回中断状态
private final boolean parkAncCheckInterrupt(){
// - 调用park() 使线程进入waiting状态
LockSupport.park(this);
// - 如果被唤醒,查看自己是不是被中断的。
return Thread.interrupted();
}
parkAndCheckInterrupt()方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。
release(int arg);独占式的释放锁
public final booean release(int arg){
if(tryRelease(arg)){// 子类自定义实现
Node h = head;
if(h != null && h.waitStatus != 0){
unmparkSuccessor(h);// 唤醒下一个节点
}
return ture;
}
return false;
}
释放锁的流程很简单,首先子类自定义的方法如果释放了同步状态,如果头节点不为空并且头节点的等待状态不为0就唤醒其后继节点。主要依赖的就是子类自定义实现的释放操作
unmparkSuccessor(Node node);唤醒后继节点获取同步状态
private void unparkSuccessor(Node node){
// - 获取头节点的状态
int ws = node.waitStatus;
if(ws < 0){
compareAndSetWaitStatus(node,ws,0);// - 通过CAS将头节点的状态设置为初始状态
}
Node s = node.next;// 后继节点
if(s == null || s.watiStatus > 0){// - 不存在或者已经取消
s = null;
for(Node t = tail; t != null && t != node; t = t.prev){ // - 从尾节点开始往前遍历,寻找离头节点最新的等待状态正常的节点
if(t.watiStatus <= 0){
s = t;
}
}
}
if(s != null){
LockSupport.unpark(s.thread);// - 真正的唤醒操作
}
}
唤醒操作,通过判断后继节点是否存在,如果不存在就寻找等待时间最长的适合的节点将其唤醒唤醒操作通过LockSupport 中的unpark方法唤醒底层也就是unsafe类的操作
锁释放移除节点
head节点表示获取锁成功的节点,当头节点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头节点,节点变化过程如下:
这个过程涉及到两个变化
设置头节点指向下一个获得锁的节点
新的获得锁的继诶眈,将pre指针指向null
将原来的head节点的next设置为null,即断开原head节点的next引用
acquire方法流程总结
首先通过子类判断是否获得了锁,如果获取了就什么也不干。tryAcquire
如果没有获取锁,通过线程创建节点加入同步队列的队尾。addWaiter
当线程在同步队列中不断的通过自旋去获取同步状态,如果获取了锁,就把其设为同步队列中的头节点,否则在同步队列中不停的自旋等待获取同步状态
acquireQueued,shouldParkAfterFailedAcquire(Node pre,Node node),parkAndCheckInterrupt()
如果在获取同步状态的过程中被中断过最后自行调用interrupted方法进行中断操作
总结
总结的来锁:线程获取锁,如果获取了锁就 保存当前获取锁的线程,如果没获取就创造了一个节点通过compareAndSetTail(jCAS操作)操作的方法将创建的节点加入同步队列的尾部,在同步队列中的节点同步自旋的操作不断去获取同步状态【当然由于FIFO先进先出的特征】等待时间越来越长就越先被唤醒。当头节点释放同步状态的时候,首先查看是否存在后继节点,如果存在就唤醒自己的后继节点,如果不存在就获取等待时间最长的复合条件的线程。
为什么AQS需要一个虚拟head节点
每一个节点都必须设置前节点的ws状态为SIGNAL(-1) 因为每个节点在休眠前,都需要将前置节点的ws设置成SIGNAL,否则自己永远无法被唤醒,所以必须要一个前置节点,而这个前置节点,实际上就是当前持有锁的节点。
由于第一个节点他是没有前置节点的,就创建一个假的。
总结下来就是:每个节点都需要设置前置节点的ws状态(这个状态为是为了保证数据一致性),而第一个节点是没有前置节点的,所以需要创建一个虚拟节点。
另外,补充redis
- 本文标签: 多线程
- 本文链接: http://www.ityoulove.com/article/43
- 版权声明: 本文由崔健宇原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权