ReentrantLock
ReentrantLock是一种可重入的互斥锁,它的行为和作用与关键字synchronized有些类似,在并发场景下可以让多个线程按照一定的顺序访问同一资源。相比synchronized,ReentrantLock多了可扩展的能力,比如我们可以创建一个名为MyReentrantLock的类继承ReentrantLock,并重写部分方法使其更加高效。
当一个线程调用ReentrantLock.lock()方法时,如果ReentrantLock没有被其他线程持有,且不存在额外的线程与当前线程竞争ReentrantLock,调用ReentrantLock.lock()方法后当前线程会占有此锁并立即返回,ReentrantLock内部会维护当前线程对锁的引用计数,当线程获取锁时会增加其线程对锁的引用计数,当线程释放锁时会减少线程对锁的引用计数,当前线程如果在占有锁之后,又重复获取锁,则会增加锁的引用计数,当锁的引用计数为0的时候,代表当前线程完全释放锁。需要注意的是,只有占有锁的线程才会增加锁的引用计数,当锁被占据时,如果有其他线程要竞争锁,ReentrantLock会把其他线程加入一个竞争锁的队列,并让线程陷入阻塞,直到占据锁的线程释放了锁,ReentrantLock才会唤醒队列中的线程重新竞争锁。
我们用下面的例子来加深对于锁的理解,假设我们的进程内目前没有任何线程竞争lock,此时锁的引用计数为0,有一个线程Thread-1调用完下面<1>处的lock()方法成功占有锁,此时锁的引用计数由0变为1。之后Thread-1调用了<2>处的methodB()方法,methodB()的<4>处又获取了一次锁,由于lock已经被Thread-1占据,所以这里简单的对锁的引用计数+1即可,此时锁的引用计数为2,Thread-1执行完methodB()的方法体后,执行<5>处的unlock()方法释放锁,这里对锁的引用计数-1,由2变为1。在调用完methodB后,执行methodA的方法体,最后执行<3>处的unlock()方法,将锁的引用计数由1变为0,Thread-1完全释放锁。此时,锁变为无主状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
private final ReentrantLock lock = new ReentrantLock(); public void methodA() { try { lock.lock(); //<1> methodB(); //<2> //methodA body... } finally { lock.unlock(); //<3> } } public void methodB() { try { lock.lock(); //<4> //methodB body... } finally { lock.unlock(); //<5> } } |
ReentrantLock提供了isHeldByCurrentThread()和getHoldCount()两个方法,前者用于判断锁是否被当先调用线程持有,如果被当前调用线程持有则返回true;后者不仅会判断锁是否被当前线程持有,还会返回锁相对于当前线程的引用计数,毕竟锁是可重入的,如果锁没有被任何线程持有,或者被不是持有锁的线程调用getHoldCount()方法,就会返回0。
这两个方法的实现原理也很简单,我们知道在Java中可以调用Thread.currentThread()来获取当前线程对象。当我们调用ReentrantLock.lock()方法成功获取锁之后,ReentrantLock内部会用一个独占线程(exclusiveOwnerThread)字段来标识当前占用锁的Thread线程对象,如果线程释放了锁且锁的引用计数为0,则将独占线程字段标记为null。当要判断锁是否被当前线程持有,或者锁相对于当前线程的引用计数,则获取调用方线程的Thread对象,和内部的独占线程字段做下对比,如果两者的引用相等,代表当前线程占用了锁,如果引用不相等,则表示当前所可能处于无主状态,或者锁被其他线程持有。
如下面的代码,我们希望只有持有lock的线程才可以执行methodB()和methodC()方法,就可以用isHeldByCurrentThread()和getHoldCount()进行判断。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private final ReentrantLock lock = new ReentrantLock(); public void methodA() { try { lock.lock(); methodB(); methodC(); //methodA body... } finally { lock.unlock(); } } public void methodB() { if (lock.getHoldCount() != 0 ) { //methodB body... } } public void methodC() { if (lock.isHeldByCurrentThread()) { //methodC body... } } |
需要注意的一点是,官方有给出isHeldByCurrentThread()和getHoldCount()两个方法的使用范围,仅针对于debug和测试。真正的生产环境如果有重入锁的需要,官方还是推荐用try{}finally{}这一套,在try代码块里获取锁,在finally块中释放锁。
创建ReentrantLock对象时,如果使用的是无参构造方法,则默认创建非公平锁(NonfairSync),如果调用的是ReentrantLock(boolean fair)有参构造方法,fair为true则创建公平锁(FairSync)。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public class ReentrantLock implements Lock, java.io.Serializable { //... //默认创建非公平锁 public ReentrantLock() { sync = new NonfairSync(); } //根据参数指定创建公平锁或非公平锁,true为公平锁。 public ReentrantLock( boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } //... } |
之前说过,当有多个线程竞争锁时,获取锁失败的线程,会形成一个队列。如果有多个线程竞争公平锁时,会优先把锁分配给等待锁时间最长的线程,即队头的线程,队列中越往后的线程等待锁的时间越短,排在队尾的线程等待时间最短。如果使用的是非公平锁,则不保证会按照等待时长顺序将锁分配。在多线程的场景下,公平锁在吞吐量方面的表现不如非公平锁,但两者在获得锁和保证不饥饿的差异并不大。
需要注意的是,公平锁不能保证线程调度的公平性,竞争公平锁的多个线程中,可能会出现一个线程连续多次获得锁的情况。比如:Thread-1、Thread-2都要竞争同一个锁(lock),但此时锁已经被其他线程占据,Thread-1、Thread-2竞争失败,预备进入等待队列,这时Thread-1、Thread-2的CPU时间片消耗完毕被挂起,而其他线程刚好释放锁将锁变为无主状态,此时Thread-3抢锁成功,并调用下面的doThread3()方法,连续10次获取锁并释放锁将锁变为无主状态。这种情况,就是上面说的公平锁无法保证线程调度的公平性,按照顺序,Thread-3在Thread-1、Thread-2竞争失败后才开始竞争,按理锁的分配顺序应该是Thread-1->Thread-2->Thread-3,但由于线程的调度问题,Thread-1、Thread-2尚未入队,而锁被释放后刚好被Thread-3“捡漏”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public void methodA() { try { lock.lock(); //methodA body... } finally { lock.unlock(); } } public void doThread3() { for ( int i = 0 ; i < 10 ; i++) { methodA(); } } |
除了调用ReentrantLock.lock()以阻塞的方式直到获取锁,ReentrantLock还提供了tryLock()和tryLock(long timeout, TimeUnit unit)两个方法来抢锁。我们看下面的代码,相信很多同学看到这两个方法后也能知道这两个方法和lock()方法的区别,tryLock()会尝试竞争锁,如果锁已被其他线程占用,则竞争失败,返回false,如果竞争成功,则返回true。tryLock(long timeout, TimeUnit unit)如果竞争锁失败后,会先进入等待队列,如果在过期前能竞争到锁,则返回true,如果在过期时间内都无法抢到锁,则返回false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public void methodD() { boolean hasLock = false ; try { hasLock = lock.tryLock(); //<1>非计时 if (!hasLock) { //没有抢到锁则退出 return ; } //methodD body... } finally { if (hasLock) { lock.unlock(); } } } public void methodE() { boolean hasLock = false ; try { hasLock = lock.tryLock( 5 , TimeUnit.SECONDS); //<2>计时 if (!hasLock) { //没有抢到锁则退出 return ; } //methodE body... } catch (InterruptedException e) { e.printStackTrace(); } finally { if (hasLock) { lock.unlock(); } } } |
需要注意的是:不管是公平锁还是非公平锁,不计时tryLock()都不能保证公平性,如果锁可用,即时其他线程正在等待锁,也会抢锁成功。
ReentrantLock内部会用一个int字段来标识锁的引用次数,因此,ReentrantLock虽然作为可重入锁,但它的最大可重入次数为2147483647(即:MaxInt32,2^31-1),不管我们是以递归或者是循环亦或者其他方式,一旦我们重复获取锁的次数超过这个次数,ReentrantLock就会抛出异常。
至此,我们了解了ReentrantLock的简单应用。下面,就请大家一起跟随笔者了解ReentrantLock的实现原理。下面的代码是笔者从ReentrantLock节选的部分代码,可以看到先前我们调用加锁(lock、lockInterruptibly、tryLock)、解锁(unlock)的代码,最后都会调用sync对象的方法,sync对象的类型是一个抽象类,在我们创建ReentrantLock对象时,会根据构造函数决定sync是公平锁(FairSync),还是非公平锁(NonfairSync),FairSync和NonfairSync都继承自Sync,所以ReentrantLock在创建好具体的Sync对象后,便不再管关心公平锁的逻辑或者是非公平锁的逻辑,ReentrantLock只知道抽象类Sync实现了它所需要的功能,这个功能是公平亦或是非公平,由具体的实现子类来关心。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public void methodD() { boolean hasLock = false ; try { hasLock = lock.tryLock(); //<1>非计时 if (!hasLock) { //没有抢到锁则退出 return ; } //methodD body... } finally { if (hasLock) { lock.unlock(); } } } public void methodE() { boolean hasLock = false ; try { hasLock = lock.tryLock( 5 , TimeUnit.SECONDS); //<2>计时 if (!hasLock) { //没有抢到锁则退出 return ; } //methodE body... } catch (InterruptedException e) { e.printStackTrace(); } finally { if (hasLock) { lock.unlock(); } } } |
鉴于ReentrantLock的无参构造函数是创建一个非公平锁,可见官方更倾向于我们使用非公平锁,这里,我们就先从非公平锁开始介绍。
当ReentrantLock为非公平锁时,调用lock()方法会直接调用sync.acquire(1),NonfairSync和Sync两个类都没有实现acquire(int arg),这个方法是由AbstractQueuedSynchronizer(抽象队列同步器,下面简称:AQS)实现的,也就是Sync的父类。
当线程竞争锁时,会先调用tryAcquire(arg)方法试图占有锁,AQS将tryAcquire(int arg)的实现交由子类,由子类决定是以公平还是非公平的方式占有锁,如果竞争成功tryAcquire(arg)则返回true,!tryAcquire(arg)的结果为false,于是就不会再调用<1>处后续的判断,直接返回。如果占有锁失败,这里会先调用addWaiter(Node mode)方法,将当前调用线程封装成一个Node对象,再调用acquireQueued(final Node node, int arg)将Node对象加入到等待队列中,并使线程陷入阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
|
//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire public final void acquire( int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //<1> selfInterrupt(); } //AbstractQueuedSynchronizer将tryAcquire(int arg)的实现交由子类 //java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire protected boolean tryAcquire( int arg) { throw new UnsupportedOperationException(); } |
我们先来看NonfairSync实现的tryAcquire(int acquires)方法,这里NonfairSync也是调用其父类Sync的nonfairTryAcquire(int acquires)方法。在AQS内部会维护一个volatile int state,可重入互斥锁会用这个字段存储占有锁的线程对锁的引用计数,即重复获取锁的次数。如果state为0,代表锁目前没有被任何线程占有,这里会用CAS的方式设置锁的引用计数,如果设置成功,则执行<2>处的代码将独占线程(exclusiveOwnerThread)的引用指向当前调用线程,然后返回true表示加锁成功。
如果当前state不为0,代表有线程正独占此锁,会在<3>处判断当前线程是否是独占线程,如果是的话则在<4>处增加锁的引用计数,这里同样是修改state的值,但不需要像<1>处那样用CAS的方式,因为<4>处的代码只有独占线程才可以执行,其他线程都无法执行。需要注意的一点是,state为int类型,最大值为:2^31-1,如果超过这个值state就会变为负数,就会报错。如果一个线程在竞争锁的时候,发现state不为0,且当前线程不是独占线程,则会返回false,表示抢锁失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
//当调用AQS的acquire(int arg)时,会先调用由子类实现的tryAcquire(int acquires)方法 //java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire protected final boolean tryAcquire( int acquires) { //这里会调用父类Sync的nonfairTryAcquire(int acquires)方法 return nonfairTryAcquire(acquires); } //java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire final boolean nonfairTryAcquire( int acquires) { //获取当前线程对象 final Thread current = Thread.currentThread(); //这里会获取父类AQS的state字段,在可重入互斥锁里,state表示占有锁的线程的引用计数 int c = getState(); //如果state为0,表示目前锁是无主状态 if (c == 0 ) { //如果锁处于无主状态,则用CAS修改state,如果修改成功,表示占有锁成功 if (compareAndSetState( 0 , acquires)) { //<1> //占有锁成功后,这里会设置锁的独占线程 setExclusiveOwnerThread(current); //<2> return true ; } } else if (current == getExclusiveOwnerThread()) { //<3>如果state不为0,代表现在有线程占据锁,如果请求锁的线程和独占线程是同一个线程,则增加当前线程对锁的引用计数 //锁的最大可重入次数为(2^31-1),超过这个最大范围,int就会变为负数,判断nextc为负数时报错。 int nextc = c + acquires; if (nextc < 0 ) // overflow throw new Error( "Maximum lock count exceeded" ); //重新设置state的值 setState(nextc); //<4> return true ; } return false ; } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... //在可重入互斥锁中,state代表独占线程当前的重入次数 private volatile int state; protected final int getState() { return state; } protected final void setState( int newState) { state = newState; } //... } public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { //... //独占线程,当有线程占据可重入互斥锁时,会用此字段存储占有锁的线程 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } } |
按照AbstractQueuedSynchronizer.acquire(int arg)的逻辑,如果抢锁失败,会继而执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这段代码。这里我们需要先来了解下Node的数据结构,Node类是AQS的一个静态内部类。如果眼尖的同学看到下面的prev和next,一定能很快猜出这就是我们先前所说的等待队列,等待队列实质上是一个双端链表,即每个节点都可以知道自己的前驱,也可以知道自己的后继。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
//java.util.concurrent.locks.AbstractQueuedSynchronizer.Node static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = - 1 ; //... volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; //... //返回当前节点的前驱节点 final Node predecessor() { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() {} //... //创建Node节点 Node(Node nextWaiter) { //<1> this .nextWaiter = nextWaiter; THREAD.set( this , Thread.currentThread()); } } |
这里简单介绍下Node的字段:
- prev指向当前节点的前驱节点,next指向当前节点的后继节点。
- thread字段在调用<1>处的构造方法时,会将thread指向当前调用线程的Thread对象。
- waitStatus(等待状态)初始值为0,当waitStatus为SIGNAL(-1)时,表示当前节点的后继节点所指向的线程(node.next.thread)陷入阻塞,当前节点如果被移除(CANCELLED)或在占有锁后要释放锁的时候,需要唤醒后继节点的线程。这里有多种可能导致当前节点的等待状态变为移除,比如调用tryLock(long timeout, TimeUnit unit) 超时会获取到锁,或者调用lockInterruptibly()后线程被中断。
- nextWaiter可以用来表示一个节点的线程到底是独占线程(EXCLUSIVE)还是共享线程(SHARED),独占线程一般用于可重入互斥锁(ReentrantLock)或者可重入读写锁(ReentrantReadWriteLock)的写锁,而共享线程则表示当前线程是可以和其他共享线程一起共享资源的,一般用于可重入读写锁的读锁。
如果对上面Node字段还有不理解的地方不用心急,笔者在后面还会和大家一起深入了解这几个字段。
在简单了解了Node的数据结构后,我们来看看AQS是如何将一个线程封装成一个Node对象,并将其加入到等待队列。addWaiter(Node mode)会根据传入的参数node,决定创建的节点是独占节点还是共享节点,先前ReentrantLock传入的是Node.EXCLUSIVE,所以这里是独占节点,在执行完<1>处的代码后,节点创建完毕,节点的thread字段也保存了当前线程对象的引用。之后会进入<2>处的循环,这里是通过CAS自旋的方式将节点加入到等待队列,之所以用这种方式是因为可能存在多个线程同时要入队的情况,用CAS自旋保证每个节点的前驱和后继的有序性。当节点要入队时,会先获取尾节点,如果在<3>处判断尾节点不为null,则将当前节点的前驱指向尾节点,并用CAS的方式设置当前节点为设置为尾节点,如果原先的尾节点(oldTail)的指向没有被任何线程修改,这里用CAS将当前节点设置成尾节点就会成功,于是原先尾节点的后继指向当前节点,当前节点入队成功。但我们也要考虑尾节点为null的情况,即第一个进入等待队列的节点,此时头节点(header)和尾节点(tail)都为null,这里就会执行<4>处的分支,进行队列初始化。初始化队列的时候,同样存在并发问题,所以这里依旧用CAS初始化头节点成功,再将头节点指向的Node对象赋值给尾节点。初始化队列完毕后,会再开始新的一轮循环,用CAS的方式尝试将节点入队,入队成功后,则返回当前节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... private transient volatile Node head; //等待队列的头节点 private transient volatile Node tail; //等待队列的尾节点 //... private Node addWaiter(Node mode) { //为竞争锁的线程创建一个Node对象,并用Node.thread字段存储调用线程Thread对象 Node node = new Node(mode); //<1> for (;;) { //<2> Node oldTail = tail; if (oldTail != null ) { //<3> node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { //<4> initializeSyncQueue(); } } } private final void initializeSyncQueue() { Node h; if (HEAD.compareAndSet( this , null , (h = new Node()))) tail = h; } private final boolean compareAndSetTail(Node expect, Node update) { return TAIL.compareAndSet( this , expect, update); } //... } |
在执行完addWaiter(Node.EXCLUSIVE)确定节点入队后,就要将返回节点传入到方法:acquireQueued(final Node node, int arg)。之前我们说过,抢锁失败的节点会进入一个等待队列,等待锁的分配,我们已经在addWaiter(Node mode)看到线程是如何入队的,那接下来就要看看线程是如何等待锁的分配。在看acquireQueued(final Node node, int arg)之前,我们先来思考下如果是我们自己会如何设计将锁分配给线程?最简单的做法是每个线程都在一个死循环中去轮询锁的状态,如果发现锁处于无主状态并抢锁成功,线程则跳出循环访问资源。但这个做法有个缺点就是会消耗CPU时间片,尤其对于一些优先级不高的线程,相比于优先级高的线程它们可能永远无法竞争到锁,永远访问不到资源处于饥饿状态。那么有没有相比死循环更好的做法呢?我们是否可以先把一个入队的线程阻塞起来,先让它不要消耗宝贵的CPU时间片,当占据锁的线程完全释放锁(state变为0)时,则去唤醒队列中等待时长最长的线程,这样也不用担心优先级低的线程无法与优先级高的线程竞争锁,导致处于饥饿状态,一举两得。
这里我们还要再加深下对等待队列Node的理解才能往下看acquireQueued(final Node node, int arg),大家思考下,Node中的thread字段是用来指向竞争锁的线程对象,通过这个对象,我们可以用释放锁的线程唤醒等待锁的线程,占用锁的线程在完全释放锁将锁变为无主状态后,唤醒等待锁的线程,这个等待锁的线程如果成功占据了锁,是否可以将本身线程中Node.thread置为null?此刻线程已经占据了锁,它不会再陷入阻塞,也不需要有其他的线程来唤醒自身。所以等待队列的头节点的thread(header.thread)字段永远为null,因为锁被头节点的线程所占用。
当然,也可能出现锁被占用但头节点(header)本身就为null,这种情况一般出现在我们初始化好一个ReentrantLock后,只有一个线程占有了锁,此时调用tryAcquire(int acquires)会调用ReentrantLock.Sync.nonfairTryAcquire(int acquires)方法,这个方法只会简单修改state状态,并不会新增一个头节点。除非锁已有线程占据,且出现新的线程竞争锁,这时候新的线程在进入等待队列的时候,会初始化队列,为本身占据锁的线程补上一个头节点,初始化队列的时候调用的是Node的无参构造方法,所以头节点的thread字段为null,表示锁被当前头节点原先指向的线程所占据。
在了解这些基本知识后,下面我们终于可以来看看大家迫不及待的acquireQueued(final Node node, int arg)了。当把封装了当前线程的Node对象传入到acquireQueued(final Node node, int arg)方法时,并不会立即阻塞当前线程等待其他线程唤醒。这里会先在<1>处获取当前节点的前驱节点p,判断p是不是头节点,如果p是头节点,则当前线程即有占有锁的可能。因为占据锁的线程会先释放锁,再通知队列中的线程抢锁。所以会存在当前节点入队前锁已被释放的情况,于是判断前驱节点p是头节点,会再调用tryAcquire(int acquires)方法抢锁,如果抢锁成功,就可以按照我们上面所说的套路,调用setHead(Node node)将当前节点设置为头节点,设置当前节点的线程引用为null,然后返回。
如果当前节点的前驱节点不是头节点,这里就要调用shouldParkAfterFailedAcquire(Node pred, Node node)设置前驱节点的等待状态(waitStatus),先前说过,这个等待状态可以用来表示下个节点的阻塞状态。假设有一个锁已经被其他线程占有,Thread-1、Thread-2要来抢锁,此时必然是抢锁失败的,这里会把Thread-1、Thread-2分别封装成Node1和Node2并进行入队,Node1和Node2初始的等待状态都为0,假定Node1先Node2入队,Node1为Node2的前驱节点(即:Node2.prev=Node1),Node1不是头节点,所以不会去抢锁,这里直接进入<2>处分支的shouldParkAfterFailedAcquire(Node pred, Node node)方法,Node1的初始等待状态为0,所以<3>处和<5>处的分支是进不去的,只能进入<4>处的分支,将Node1的等待状态设置为SIGNAL,表示Node1的后继节点处于等待唤醒状态,然后返回false,于是<2>处的判断不成立,又开始新的一轮循环,假定头节点的线程依旧没释放锁,Node1依旧不是头节点,还是直接执行shouldParkAfterFailedAcquire(Node pred, Node node)方法,此时判断Node2的前驱节点Node1的等待状态为-1,表示可以阻塞Node1后继节点Node2所指向的线程,所以这里会返回true,进入<2>处的分支,调用parkAndCheckInterrupt()方法,在这个方法中会调用LockSupport.park(Object blocker)阻塞当前的调用线程,直到有其他线程调用LockSupport.unpark(Node2.thread)唤醒Node2被阻塞的线程,或Node2.thread被中断才会退出parkAndCheckInterrupt()。我们注意到在<5>处有一个判断,前驱节点的等待状态>0,一般状态为CANCELLED(1),表示前驱节点被移除。之所以会存在被移除的节点,是因为我们可能以tryLock(long timeout, TimeUnit unit)的方式往等待队列中添加节点,如果超时还未获得锁,这个节点就要被移除;我们还可能用lockInterruptibly()的方式往等待队列中添加节点,如果节点所对应的线程被中断,这个节点也处于被移除状态。所以<5>处如果发现前驱节点的等待状态大于0,会一直往前驱节点遍历直到找到等待状态<=0的节点将其作为前驱节点,并将前驱节点的后继指向当前节点。要注意的是,等待状态为-1时,代表当前节点的后继节点等待唤醒,>0的时候,代表当前节点被移除,前者的状态与后继节点有关,后者的状态仅与自身有关。如果在自旋期间线程出现其他异常,则会调用<6>处的代码将节点从等待队列移除,并抛出异常。cancelAcquire(Node node)会在后面介绍,这里我们只要先知道这是一个将节点从队列中移除的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... private transient volatile Node head; //... final boolean acquireQueued( final Node node, int arg) { boolean interrupted = false ; try { for (;;) { final Node p = node.predecessor(); //<1> if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) //<2> interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); //<6> if (interrupted) selfInterrupt(); throw t; } } //... //设置当前节点为头节点,此时可以清空头节点指向的线程引用 private void setHead(Node node) { head = node; node.thread = null ; node.prev = null ; } //... private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //<3> /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) {//<5> /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {//<4> /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false ; } //... private final boolean parkAndCheckInterrupt() { //阻塞调用线程,可调用LockSupport.unpark(Thread thread)唤醒或由线程中断唤醒。 LockSupport.park( this ); //返回线程是否由中断唤醒,返回true为被中断唤醒,但此方法会清除线程的中断标记 return Thread.interrupted(); } //... } |
能从boolean acquireQueued(final Node node, int arg)方法中返回的线程,都是成功占有锁的线程,但返回结果分当前线程是否被中断,true为被中断。可能存在这样一种情况,前一个线程释放锁完毕后,即将唤醒后一个线程,此时后一个线程被中断唤醒,后一个线程发现其Node节点的前驱节点为头节点,且锁为无主状态,于是抢锁成功直接返回。这里要标记线程的中断状态interrupted,因为线程会从parkAndCheckInterrupt()中被唤醒,最后会执行Thread.interrupted()返回当前线程是否由中断唤醒,但Thread.interrupted()会清除中断标记,所以在占据锁之后会根据返回的interrupted状态,决定是否设置线程的中断状态。如果一个线程在调用acquireQueued(final Node node, int arg)方法的后都未被中断,直到前一个线程调用LockSupport.unpark(Thread thread)唤醒该线程,那么这个线程就不是用中断的形式唤醒,也就不用设置线程的中断状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... public final void acquire( int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //根据acquireQueued()的返回,决定是否设置线程的中断标记 selfInterrupt(); } //... static void selfInterrupt() { Thread.currentThread().interrupt(); } //... } |
到此这篇关于Java源码解析之详解ReentrantLock的文章就介绍到这了,更多相关ReentrantLock源码解析内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://www.cnblogs.com/beiluowuzheng/p/14948225.html