经过AQS的前世和今生,我们已经知道AQS是Java中提供同步状态原子管理、线程阻塞/唤醒和线程排队功能的同步器基本框架。所以今天我们将学习通过AQS实现的东西ReentrantLock
。按照惯例,我们先来看看三个关于ReentrantLock
常见面试题:
- 什么是
ReentrantLock
? ReentrantLock
内部原理是什么?如何实现可重入性?ReentrantLock
和synchronized
有什么区别?如何选择?
接下来,我将尽可能多地通过分析源代码来回答上述问题。
什么是Reentrantlock?ReentrantLock
翻译成可重入锁。在《一篇文章中理解并发编程中的锁》中,我们解释了锁的可重入特性:同一线程可以多次锁定,也就是说,可以重复锁定的逻辑。
Doug Lea就是这样描述的ReentrantLock
的:
A reentrant mutual exclusion {@link Lock} with the same basic behavior and semantics as the implicit monitor lock accessed using {@code synchronized} methods and statements, but with extended capabilities.
“A reentrant mutual exclusion Lock说明Reeentrantlock除了具有可重入的特点外,还是一把互斥锁。然后看下面的内容,ReentrantLock
与使用synchronized
方法/句子有相同的基本行为和语义。最后 but with extended capabilities"则表明了ReentrantLock
具有更好的扩展能力。
那么可重入互斥锁就是可重入互斥锁ReentrantLock
一切?别担心,让我们回头看:
The constructor for this class accepts an optional fairness parameter. When set true, under contention, locks favor granting access to the longest-waiting thread. Otherwise this lock does not guarantee any particular access order.
Reentrantlock提供了两种公平/非公平模式,可以通过构造器参数指定公平模式。
嗯,到目前为止,我们已经对了ReentrantLock
根据《一文理解并发编程中的锁》中的分类,有了更清晰的认知,ReentrantLock
本质是互斥锁,具有可重入的特点。此外ReentrantLock
还实现了公平和非公平两种模式。
ReentrantLocak
使用非常简单:
ReentrantLock lock = new ReentrantLock();lock.lock();// 业务逻辑lock.unlock();
通过无参构造器创建ReentrantLock
对象后,调用lock
和unlock
加锁解锁操作。除无参结构器外,ReentrantLock
还提供了一个参构造器:
// publicic无参构造器 ReentrantLock() { sync = new NonfairSync();}// publicic有一个参构造器 ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}
FairSync
和NonfairSync
是ReentrantLock
构造器可以指定内部类ReentrantLock
公平模式或非公平模式。具体来说,我们先按下不表,先来看看ReentrantLock
提供的其他方法。
除了常用的lock
外,ReentrantLock
还提供了三种加锁方法:
// 试着获得锁public boolean tryLock();// 尽量获得锁,否则排队等待指定时间的public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;// 试着获得锁public void lockInterruptibly() throws InterruptedException;
tryLock
直接尝试获得锁,特点是竞争失败时直接返回false,不会进入队列等待。重载方法tryLock(long timeout, TimeUnit unit)
在队列中增加的最大等待时间,如果锁竞争失败,将加入等待队列,再次尝试获得锁,直到加班或中断。
lockInterruptibly
特点是,调用thread.interrupt
抛出后中断线程InterruptedException
异常,结束竞争。虽然。lock
也允许中断线程,但不会抛出异常。
除常用的加锁方法外,ReentrantLock
还提供了分析锁的方法:
方法声明
作用
public int getHoldCount()
返回当前线程持有锁的次数,即当前线程重入锁的次数
public final int getQueueLength()
返回等待获得锁的线程数量估算值
public final boolean hasQueuedThread(Thread thread)
查询当前线程是否在等待获取锁
public final boolean hasQueuedThreads()
有没有等待获得锁锁的线程?
public final boolean isFair()
是否为公平锁
public boolean isHeldByCurrentThread()
当前线程是否持有锁
public boolean isLocked()
锁是否线程持有,即锁是否使用
public Condition newCondition()
创建条件对象
public int getWaitQueueLength(Condition condition)
在这种情况下等待线程的数量
public boolean hasWaiters
在这种情况下,是否有线程等待
Rentrantlock源码分析接下来,我们通过源代码进行分析ReentrantLock
对比不同加锁方法的实现差异,实现公平/非公平模式和重入性原理。
让我们先了解一下ReentrantLock
的结构:
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; // 同步器 abstract static class Sync extends AbstractQueuedSynchronizer {} // 非公平模式同步器 static final class NonfairSync extends Sync {} // 公平模式同步器 static final class FairSync extends Sync {}}
ReentrantLock
仅仅实现了Lock
接口没有直接继承AbstractQueuedSynchronizer
,其内部类Sync
继承AbstractQueuedSynchronizer
,并提供了FairSync
和NonfairSync
公平锁和非公平锁是两种实现。
我们已经知道,创建公平/非公平模式可以指定不同的参数ReentrantLock
,不同的反应到源代码使用不同的反应Sync
的实现类:
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}
而且在加锁/解锁操作中,都是由Sync
实现类完成,ReentrantLock
只是对Lock
接口的实现:
public class ReentrantLock implements Lock, java.io.Serializable { public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); }}
让我们回忆一下《AQS的今生,构建JUC的基础》acquire
方法:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }}
AQS本身只在等待队列中添加了线程acquireQueued
该方法预留了获取锁的方法tryAcquire
方法。
所以我们不难想,ReentrantLock
作用机制:从AQS继承Sync
,实现了tryAcquire
获取锁的方法,并使用AQSacquireQueued
实现排队功能,而ReentrantLock
是否公平,是否tryAcquire
实现方式密切相关。
FairSync
很简单,只做了tryAcquire
实现方法:
static final class FairSync extends Sync { @ReservedStackAccess protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获得同步状态,AQS实现 int c = getState(); // 判断同步状态 // c == 0时表示没有线程持有锁 // c != 0时表示有线程持有锁 if (c == 0) { // hasQuedpredessors判断是否有等待锁的线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 线程重入,同步状态+1 int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } // 更新同步状态 setState(nextc); return true; } return false; }}
当c == 0
当锁未被任何线程持有时,通过hasQueuedPredecessors
判断是否有等待锁定的线程。如果没有等待的线程,则通过compareAndSetState(0, acquires)
尝试替换同步状态以获得锁;当c != 0
当锁被线程持有时,通过current == getExclusiveOwnerThread
判断当前线程是否持有,如果是,则认为是重新进入和执行int nextc = c + acquires
,更新同步状态setState(nextc)
,并返回成功。
FairSync
在获得锁之前,体现了公平性hasQueuedPredecessors
,确认是否有线程在等待锁,如果有,tryAcquire
执行失败,默默执行AQSacquireQueued
加入等待队列。
NonfairSync
也只是做了tryAcquire
实现,而且只是用了父类nonfairTryAcquire
方法:
static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }}abstract static class Sync extends AbstractQueuedSynchronizer { @ReservedStackAccess final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextc); return true; } return false; }}
nonfairTryAcquire
与FairSync#tryAcquire
就像两颗豌豆一样,忽略方法声明的唯一区别是,当c == 0
时,nonfairTryAcquire
并不会调用hasQueuedPredecessors
确认有线程是否在等待获取锁,而是直接通过compareAndSetState(0, acquires)
尝试更换同步状态以获得锁。
NonfairSync
不公平反映在获取锁之前,不会确认是否有线程等待锁,而是直接获取锁。如果获取失败,AQS仍将执行acquireQueued
加入等待队列。
在《AQS的今生,构建JUC的基础》中提到ReentrantLock
再入性取决于同步状态state作为计数器的特性,在公平锁定中FairSync
和非公平锁NonfairSync
在实现过程中,我们还可以看到,同步状态+1的操作将在线程重新进入时进行:
int nextc = c + acquires;setState(nextc);
既然lock
操作中有同步状态+1的操作,所以unlock
操作中必须有同步状态-1的操作:
public class ReentrantLock implements Lock, java.io.Serializable { public void unlock() { sync.release(1); } abstract static class Sync extends AbstractQueuedSynchronizer { @ReservedStackAccess protected final boolean tryRelease(int releases) { // 线程退出,同步状态-1 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException(); } boolean free = false; if (c == 0) { // 同步状态为0,锁没有被持有,释放独占锁 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } }}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != null && h.waitStatus != 0){ unparkSuccessor(h); } return true; } return false; }}
tryRelease
同步状态-1后,如果同步状态为0,则表示锁未被持有,修改锁的独家线程,然后更新同步状态。
我们再来看ReentrantLock
实现可重入性很简单吗?判断线程是否重入取决于它getExclusiveOwnerThread
方法,获取当前独家锁的线程,记录重新进入的次数取决于同步状态作为计数器的特性。
现在我能理解为什么ReentrantLock
中lock
要与unlock
操作成对吗?最后,提出一个小问题。为什么?lock
和unlock
在操作中,只有当c == 0
时的lock
CAS需要操作吗?
我们之前已经知道了ReentrantLock
提供的四种加锁方法是:
public void lock()
,最常用的加锁方法,允许中断,但不会抛出异常,加锁失败进入等待队列;public void lockInterruptibly()
,允许中断和抛出InterruptedException
异常情况下,加锁失败进入队列,直到被唤醒或中断;public boolean tryLock()
,试着直接加锁,加锁失败不会进入队列,而是直接返回false;public boolean tryLock(long timeout, TimeUnit unit)
,试着直接加锁,中断时扔出去InterruptedException
异常情况下,加锁失败进入队列,直到指定时间内加锁成功或加班。
lock
调用方法:
public class ReentrantLock implements Lock, java.io.Serializable { public void lock() { sync.acquire(1); }}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }}
lockInterruptibly
调用方法:
public class ReentrantLock implements Lock, java.io.Serializable { public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } if (!tryAcquire(arg)) { doAcquireInterruptibly(arg); } }}
可以看出,差异主要体现在acquireQueued
和doAcquireInterruptibly
的实现上:
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; return interrupted; } // 当parkandcheckinterupt为true时,修改interupted标记以中断 if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; }}private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; return; } // 当parkandcheckinterupt为true时,抛出异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; }}
从源代码的角度来看,差异反映在正确的方面parkAndCheckInterrupt
在结果处理方法上,acquireQueued
只标记中断状态,而doAcquireInterruptibly
异常直接抛出。
public boolean tryLock()
实现非常简单:
public boolean tryLock() { return sync.nonfairTryAcquire(1);}
直接调用Sync#nonfairTryAcquire
,我们已经知道在非公平锁的内容之前nonfairTryAcquire
如果不调用AQS,只是尝试了一次不公平的加锁。acquireQueued
等待队列不会加入。
tryLock
重载方法并不复杂。按照以前的习惯,应该有特殊的acquireQueued
实现:
public class ReentrantLock implements Lock, java.io.Serializable { public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 计算超时时间 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; return true; } // 判断超时间 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // 调用LockSupportort.parknanos暂停指定时间 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); // 线程中断抛出异常 if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }}
public boolean tryLock(long timeout, TimeUnit unit)
依赖于特征LockSupport.parkNanos
暂停线程指定时间的能力。另外,在判断是否需要park时,我们可以注意到是的nanosTimeout
与SPIN_FOR_TIMEOUT_THRESHOLD
的判断:
- 当
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
park和upark对性能的影响小于自旋nanosTimeout
纳秒; - 当
nanosTimeout < SPIN_FOR_TIMEOUT_THRESHOLD
park和upark对性能的影响大于自旋nanosTimeout
纳秒。
在这里,我们完成了四种锁定方法之间的差异。一般逻辑是相似的(例如,唤醒头部节点),只是为了添加一些细节来实现一些特性,你可以仔细阅读源代码,很容易看到差异。
结语关于ReentrantLock
内容到此结束,因为AQS的部分已经分开拆除,所以今天没有太复杂的内容。你可以专注于它ReentrantLock
如何利用AQS实现公平/非公平模式,以及可重新进入的特点,如getHoldCount
,isFair
我相信你能想象这种方法是如何实现的,你可以结合源代码来验证你的想法。
最后,我希望今天的内容能帮助你更清楚地理解ReentrantLock
,假如文章中有错误,也希望大家不吝赐教。