JUC源码详细分析之Part2——《锁》
本系列笔记笔者将会详细的讲解JUC中核心类的源码,其中基于的JDK版本为
1.8.0_291
。由于JUC内容很多,因此笔者将笔记拆为四部分:另外,本系列笔记部分内容参考自《深入浅出 Java Concurrency》
3. 锁
锁是JUC中最核心的内容,在本章我们将讲解JUC下常见的锁,以及介绍下著名的AQS框架。
在讲解源码前,我们先来介绍一些基本的但常见的概念。
锁是什么?解决了什么问题?
在多线程下,我们希望有些代码是线程安全的执行的,我们将这样的代码行叫临界区。锁保证了临界区的代码可以像单条指令一样原子的执行,当某个线程持有锁的时候,就可以执行临界区的代码,其他线程此时是不可以的,只有这个持有锁的线程释放锁才能继续由下一个线程执行。
悲观锁,乐观锁
悲观锁:每次都以悲观的态度假设最坏的情况,每次拿数据的时候都认为别人会修改,所以每次在操作前会先上锁,操作后再将锁释放,Java中的
synchronized
和ReentrantLock
都是悲观锁。乐观锁:每次都是乐观的态度假设当前是最好的情况,每次拿数据的时候都认为别人不会修改,所以不会上锁,但是在修改的时候会判断别人有没有修改这个数据(通过CAS和版本号实现),Java中的
AtomicInteger
就是乐观锁的一种实现,依赖了CAS。我们一般也认为悲观锁是阻塞锁,而乐观锁是非阻塞锁。非阻塞指的是不阻塞其他线程,即自己可以失败可以阻塞挂起,但不能因为自己的失败和挂起影响其他线程导致其他线程失败挂起。
很多乐观锁的实现都依赖于CAS,但CAS本质是没有锁的,虽然叫乐观锁但它并没有锁。CAS只是通过比较和替换的方式,只有比较之后符合要求才进行替换,其中比较和替换是个原子操作,这其中并没有上锁以及并不会干扰任何其他线程,通常CAS都是自旋的,不断的尝试比较和替换,所以这里可以看到CAS是非阻塞锁,他的执行并不会干扰其他线程。由于CAS没有加锁,所以需要通过CAS同步修改的变量都应该使用volatile修饰符,这样保证变量的修改可以被其他线程看到。
通常而言乐观锁应用于写少读多的情况,悲观锁应用于写多读少的情况。
可重入锁,不可重入锁
所谓可重入锁就是如果一个线程持有了锁,那么它可以重复执行需要这种锁的临界区代码,可以进行重复加锁。反之为不可重入锁。
如:
public void test(){ lock.lock(); try { while (x > 10){ x--; test(); } }finally { lock.unlock(); } }
独占锁,共享锁
所谓独占锁和共享锁是对资源而言的,如果某个资源只能同时被一个线程访问,那么就需要使用独占锁。而如果可以被所有线程一起访问,那么就可以使用共享锁。其中独占锁又叫互斥锁。大多数情况下我们在说锁的时候都是默认独占锁。
公平锁,非公平锁
如果获取一个锁是按照请求的顺序得到的,那么就是公平锁,否则就是非公平锁。
这里有一篇很棒的文章,可以方便大家对Java中的锁有个大体了解:
不可不说的Java“锁”事 - 美团技术团队 (meituan.com)
3.1 Lock与AQS
Java下的Lock接口定义如下:
public interface Lock {
//获取锁,如果锁不可用,出于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
Condition newCondition();
}
JUC下大部分锁的实现均是继承自该接口,其中Lock比较重要的两个函数是lock
和unlock
,我们在讲具体实现类的时候也主要讲这两个接口的实现。
在讲JUC下锁的实现类前,我们先想下如果让你设计一个锁,实现lock()
方法,你需要做什么?
首先要判断是否允许当前线程获取锁,对于是否允许获取锁,得有一个状态位,这个状态位的修改必须是线程安全的。其次如果允许获取锁,就获取,否则就阻塞等待或者直接获取失败。若线程需要阻塞等待,那么就需要一个阻塞等待的队列。最后阻塞的时候,线程往往需要挂起不消耗资源。当状态位由不允许获取变更到允许获取时,得唤醒队列里的线程,并且线程得从阻塞队列中移除出去。
综上我们发现lock()
方法需要以下条件:
- 一个标志锁同步状态的原子操作的状态位。
- 一个有序的队列,方便管理所有阻塞等待的线程。
- 线程未获得锁,等待时可能需要挂起,当可以获得锁时再唤醒等待的线程。即线程的挂起和唤醒功能。
而这便是AQS框架的核心。AQS全称AbstractQueuedSynchronizer
,它是JUC中最复杂最核心的一个类,后面我们讲到的CountDownLatch/FutureTask/ReentrantLock/RenntrantReadWriteLock/Semaphore等都是在其基础上实现的。
在AQS中锁的同步状态是通过volatile int
来实现的
public abstract class AbstractQueuedSynchronizer{
//...
private volatile int state;
//...
}
看到volatile的时候很多同学应该也明白这个状态位线程安全的修改都是依赖CAS。
AQS中提供了如下方法来访问状态位:
//获得状态位
protected final int getState()
//直接修改状态位
protected final void setState(int newState)
//通过CAS线程安全的修改状态位
protected final boolean compareAndSetState(int expect, int update)
线程的挂起和唤醒通过LockSupport
类来实现的,LockSupport
底层是JNI,其性能比Java原本的线程挂起和唤醒效率更高。LockSupport的API很简单:
//挂起当前线程
public static void park()
//唤醒当前线程
public static void unpark(Thread thread)
而阻塞队列在AQS中使用CLH队列的变体,简单来讲就是一个FIFO的包含头尾节点的双向的队列。队列节点主要包含属性如下:
class Node {
//节点的等待状态,主要包含SIGNAL/CANCELLED/CONDITION/PROPAGATE等状态
volatile int waitStatus;
//前置节点
volatile Node prev;
//后置节点
volatile Node next;
//绑定的线程
volatile Thread thread;
//主要用于Condition
Node nextWaiter;
}
同时AQS持有队列的头尾节点:
public abstract class AbstractQueuedSynchronizer{
//...
private transient volatile Node head;
private transient volatile Node tail;
//...
}
有了这些基础后,我们就可以简单的实现一个锁了,我们使用AQS框架,模拟一下lock()
的简单实现
- 调用
lock()
方法,通过锁的同步状态位判断当前线程是否允许获得锁,如果允许获取就结束,否则进入步骤2 - 由于不允许获取锁,将当前线程封装为Node节点,加入到CLH队列,判断当前线程是否需要挂起等待
- 如果需要挂起等待就调用
LockSupport.park()
方法挂起当前线程,否则就持续不断的尝试获取锁。
而unlock()
流程也很简单:
- 调用
unlock()
,释放锁,修改锁的状态位 - 如果后继节点是挂起等待的,就将后继节点唤醒,让他继续重新尝试获取锁。
看,借助AQS我们已经很轻松的实现了一个锁,后面我们讲的所有不同锁的实现类都与上述流程基本相同。另外贴一下AQS框架的整体架构图:
上图中有颜色的为Method,无颜色的为Attribution。
我们将AQS框架分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。对于自定义的锁继承AQS的时候只需要重写第一层的方法即可,不需要关心底层的实现逻辑。自定义锁对于加锁和释放锁会进入第二层,如果锁获取失败就会进入第三和第四层对等待队列 进行处理,而这些处理方式均依赖于第五层的基础数据提供层。
如果你还是不太了解AQS,没关系,看完下面的实现类我想你会对AQS有更清晰的认识。
3.2 ReentrantLock
ReentrantLock是JUC提供的一个可重入锁和独占锁。其使用场景大致如下:
public class LockDemo{
private int value;
private Lock lock = new ReentrantLock();
public int get(){
this.lock.lock();
try {
return this.value;
}finally {
this.lock.unlock();
}
}
public void set(int value){
this.lock.lock();
try {
this.value = value;
}finally {
this.lock.unlock();
}
}
public int getAndSet(int value){
this.lock.lock();
try {
int temp = this.value;
this.value = value;
return temp;
}finally {
this.lock.unlock();
}
}
public boolean compareAndSet(int expect,int update){
this.lock.lock();
try {
if(value==expect){
value = update;
return true;
}else {
return false;
}
}finally {
this.lock.unlock();
}
}
public int getAndIncrement(){
this.lock.lock();
try {
return this.value++;
}finally {
this.lock.unlock();
}
}
public int getAndDecrement(){
this.lock.lock();
try {
return this.value--;
}finally {
this.lock.unlock();
}
}
public int incrementAndGet(){
this.lock.lock();
try {
return ++this.value;
}finally {
this.lock.unlock();
}
}
public int decrementAndGet(){
this.lock.lock();
try {
return --this.value;
}finally {
this.lock.unlock();
}
}
}
可以看到我们通过锁实现了一个AtomicInteger
。
3.2.1 lock
下面我们就重点分析一下ReentrantLock加锁的原理
public class ReentrantLock implements Lock{
private final Sync sync;
}
ReentrantLock内部只有一个有意义的属性sync,其中Sync类是其内部类,这是个抽象类,继承自AQS
,该类有两个实现类分别是FairSync
和NonfairSync
,即公平锁和非公平锁。
默认的new ReentrantLock
构造方法构造的是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
非公平锁的加锁会比公平锁稍微复杂一丢丢,为了源码流程的方便,我们先看公平锁的加锁过程:
当调用lock.lock()
时,本质上调用的是sync.lock()
public void lock() {
sync.lock();
}
我们现在看公平锁,因此调用的是公平锁的lock()
static final class FairSync extends Sync {
//...
final void lock() {
acquire(1);
}
//...
}
因此核心就是acquire(1)
方法,这个方法是AQS实现的:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法实际做了四个操作:
tryAcquire(1)
尝试获取锁,如果能获得就直接返回,否则进行步骤2addWaiter(Node.EXCLUSIVE)
将当前线程封装成Node节点,并将节点加入到CLH队列的最尾端acquireQueued()
自旋尝试获得锁,如果获得不了,就判断自己是否应park,如果应该就park,否则就一直尝试获取锁selfInterrupt()
设置中断位
下面我们会一个方法一个方法的看源码:
tryAcquire由FairSync实现,其源码如下:
//tryAcquire方法用于尝试获取锁
protected final boolean tryAcquire(int acquires) {
//得到当前请求线程
final Thread current = Thread.currentThread();
//获得当前AQS的状态(其实也是锁的状态,即前面说的一个状态位标识当前是否允许获取锁)
int c = getState();
if (c == 0) { //当前锁没被占用,可以获取锁
//如果阻塞队列确实是空(或头结点是当前线程),并且将锁的状态设置为1成功后(即更改锁的状态,避免其他线程进来)
//这里对于状态值的修改是通过CAS实现的
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//那么就将当前线程设为当前正在独占(运行)的线程,即AQS中会有个exclusiveOwnerThread字段,用于存储独占锁当前正在运行的线程
//exclusiveOwnerThread这个字段用于下面的可重入判断
setExclusiveOwnerThread(current);
//此时线程获取锁成功,返回true,即可以获得锁
return true;
}
}
//否则如果当前获取锁的线程是当前线程的话,那么也允许获取锁,可见这是一个可重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; //对于重复获取锁的线程,将状态位+1
//溢出校验 int值溢出
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//这里对于状态值得修改不需要加锁或CAS,因为能走到这里,肯定正在运行的线程是当前线程,必不可能有其他线程进来一起修改这个值
setState(nextc);
//可重入锁也返回true,代表获取锁成功
return true;
}
//否则返回失败,代表获取锁失败
return false;
}
//这个方法用于判断当前线程是否满足获取锁,它是公平锁和非公平锁主要的区别
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
//h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
//翻译过来即 如果头尾不相等并且(头的下一个节点是空或者头结点的下一个节点不等于当前线程) 则返回true
//这样不利于理解,我们对这个逻辑整体取反,即如果(头尾相等) || (头节点的下一个节点不是空 并且头结点的下一个节点是当前线程)
//这就很好理解了
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
addWaiter源码如下:
//会执行这个函数意味着获取锁失败了,那么就开始将当前线程加入CLH队列,排队等待获取锁
//将当前线程封装为Node对象(Node对象即为我们之前说的CLH队列每个元素的类型),并将其加入到CLH队列的队尾,并返回构造的这个Node对象
private Node addWaiter(Node mode) {
//这里的传参mode是Node.EXCLUSIVE,表示是独占模式
//在new Node中,第一个参数被赋予了thread字段,第二个参数被赋予了nextWaiter字段,也即认为独占锁的nextWaiter是空的,也即独占锁不具有条件
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果当前队列的尾节点不等于空
if (pred != null) {
//那么就让我们之前构造的节点(该节点盛装当前线程)的pre指针指向队列尾节点
node.prev = pred;
//同时通过CAS将当前节点设为新的队列尾节点
if (compareAndSetTail(pred, node)) {
//成功后将之前尾节点的next指针指向当前节点
pred.next = node;
//完全完成后当前节点已经是尾节点,将构造的当前节点返回
return node;
}
}
//否则,这里的否则包括:
//1.尾节点等于空
//2.尾节点不等于空但是CAS操作将当前节点设置为尾节点失败了
//那么就执行enq函数,enq函数做的也是将当前节点置为尾节点
enq(node);
return node;
}
//在将当前节点设置为尾节点失败后,通过enq函数,保证将当前节点设为尾节点一定要成功
//首先将当前节点设为尾节点失败有两个原因:
//1.队列是空的(尾节点不存在)
//2.队列不为空,但CAS操作将当前节点设为尾节点失败了,一般是有其他线程在抢
//针对第一个问题就创建一个队列(创建一个头尾傀儡节点),这里为了保证创建成功会死循环一直创建
//针对第二个问题 也是无限尝试,直到通过CAS操作尝试成功。
private Node enq(final Node node) {
//死循环,无限尝试,直到成功
for (;;) {
//获得当前尾节点
Node t = tail;
if (t == null) {
//如果尾节点是空的话(代表整个CLH队列是空的不存在的或还没初始化的)
//那么就创建一个无意义节点,我们将这个无意义节点称之为傀儡节点
//通过CAS操作将傀儡节点设为CLH队列的头
if (compareAndSetHead(new Node()))
//并让尾节点也等于头
//此时完成了CLH的初始化,即只有一个无意义的傀儡节点,这个节点既是队列头也是队列尾巴
tail = head;
} else {
//否则
//一旦进入了这个else语句代表尾节点不再是空,也即CLH完成了初始化,如果初始化未成功(因为是CAS操作很可能不成功),那么就死循环初始化直到成功,成功后才能进入else
//将当前节点的pre指针指向队列的尾节点
node.prev = t;
//CAS操作设置当前节点为队列尾节点(如果不成功,死循环操作,无限尝试,直到成功)
if (compareAndSetTail(t, node)) {
//之前队列节点的next指针
t.next = node;
return t;
}
}
}
}
这里需要说明一下:
在初始的时候AQS内的CLH队列是没被创建的,只有需要的时候才会被创建。另外我们知道AQS具有队列的头尾指针,其中这个头指针指向的是个傀儡节点,永远一直都是傀儡节点。也即初始的时候,会创建一个无意义节点,这时头指针和尾指针都指向这个无意义节点,当有Node入队的时候,就将tail.next = node; tail = node;
,替换新的尾节点,但此时头节点的指向依然没有变,还是无意义节点。因此真正的头节点其实是head.next
,头节点是假的这个信息十分重要,在前面的hasQueuedPredecessors
函数其实就已经使用到了这一信息,后面我们还会频繁的用到,希望大家能在此处就记住。
acquireQueued源码如下:
//走到这里意味着线程获取锁失败,并且线程已经封装为节点,当前节点已经在CLH队列最尾端了
//参数node即addWaiter方法返回的节点,也即封装了当前线程的节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中断标志位
boolean interrupted = false;
//无限尝试
for (;;) {
//获得当前节点的前置节点(即pre指针指向的节点)
final Node p = node.predecessor();
//如果当前节点的前置节点是头结点,那么就尝试获取锁
//这里我们又用到了头节点是傀儡节点这一信息
//如果当前节点的前置节点是头结点,那么就可以认为当前节点是真正的头结点,也即队列的第一个元素
//那么既然是队列的第一个元素,就具有获取锁的资格,因此执行tryAcquire函数获取锁
if (p == head && tryAcquire(arg)) {
//锁获取成功后,将当前节点设为头节点(新的傀儡节点,因为当前节点已经获得锁了,所以这个节点本身已经没有意义了,只是傀儡节点的意义了。)
setHead(node);
//将之前的头节点释放掉
p.next = null; // help GC
failed = false;
return interrupted;
}
//走到这里意味着当前节点不是真实头结点或当前节点是真实头结点但获取锁失败
//判断当前节点是否应该挂起,如果应该挂起就挂起当前节点
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//理论上如果线程获得了锁,那么failed就必然是false,如果线程没有获得锁,那么就应该for死等待获得锁
//会进入到这里,就说明线程没有获得锁并且被中断了,此时我们就需要更改节点的状态并从CLH队列中移除
cancelAcquire(node);
}
}
//判断当前节点是否应该挂起,如果需要挂起就返回true,否则返回false
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//得到当前节点的前置节点状态
int ws = pred.waitStatus;
//如果是signal状态,也即当前节点的前置节点也是阻塞未获取锁的,正常来说只有前面节点获得了锁,并且释放了锁,当前节点才能获得锁
//因此既然前置节点也是阻塞的,那么当前节点应该挂起等待(等到被前置节点唤醒)
if (ws == Node.SIGNAL)
return true;
//如果当前节点的前置节点状态大于0,也即cancelled状态,那么从当前节点开始,往前推,删除中间所有的状态大于0(即被cancelled的节点)
//相当于每次走到这个函数时都会重新整合CLH链表,剔除无意义的节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//走到这个else代表当前节点的前置节点状态是0(因为ReentrantLock无-2,-3状态)
//因为前置节点后面有节点(即当前节点)需要被处理(适当的时候被唤醒),因此将当前节点的前置节点状态由0改为-1
//这里0和-1的区别是:
//如果一个节点状态是0,那么当他获得锁并且执行完指令释放锁之后,他会认为自己没有需要处理的继任节点,也即释放完锁不需要唤醒其他节点
//但对于节点状态是-1,那么当他获得锁执行完任务并且释放锁后,他会唤醒自己的继任节点,让继任节点获得锁开始运行
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//这里需要明确的一点是由于acquireQueued()方法是for(;;)形式,也即可能会不断的调用当前函数,所以某次返回false不代表当前线程不需要休眠,可能某次执行的是ws> 0 的分支,重排完队列后,下一次再进来就是执行ws == Node.SIGNAL,此时就需要挂起了。
return false;
}
//取消获取锁,在锁获取失败时执行
private void cancelAcquire(Node node) {
if (node == null)
return;
//释放当前节点绑定的线程
node.thread = null;
//获得当前节点的前置节点,判断当前节点前置节点是否是CANCELLED状态,
//如果是那么再往前找,如此循环,一直找到一个状态小于等于0的节点,将当前节点的前置节点设为那个节点
//这里其实就是在剔除当前节点前面的无效等待节点(状态大于0的节点都是无效节点)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//此时pred已经是最靠近当前节点的有意义的前节点
Node predNext = pred.next;
//将当前节点状态设为CANCELLED,避免当前节点绑定的线程被唤醒
node.waitStatus = Node.CANCELLED;
//如果当前节点是尾节点,由于我们做了一些无意义节点的剔除工作,此时就需要重设一个有意义的尾节点
//很容易得出pred节点就是现在真实有意义的尾节点(前提是当前节点是尾节点)
if (node == tail && compareAndSetTail(node, pred)) {
//同时将pred节点(新的尾节点)的next指针指向空
compareAndSetNext(pred, predNext, null);
} else { //如果当前节点不是尾节点或者将pred节点设为尾节点失败
int ws;
//这个if比较长,但其实就说了一句话,如果pred节点是SIGNAL(有后续需要通知的节点),那么就如何如何
//我们可以一个一个看
//pred != head 前置节点不是傀儡头 并且
//pred.waitStatus== Node.SIGNAL 前置节点的状态是SIGNAL 或者
//ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 即如果前置节点不是SIGNAL但他是<=0的,那么就把它设置成SIGNAL状态
//所以归根结底,就是让pred状态为SIGNAL状态
//这里将pred状态设置为SIGNAL的原因也很简单,因为当前节点不是尾节点,那么就代表当前节点还有后继待唤醒的节点,但当前节点以及前面的一些连续无效节点要被删了,此时就需要新的节点来作为它们的前置节点,然后唤醒这些后继节点。因此将waitStatus设为SIGNAL,代表有后继需要唤醒的节点。
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//走到这里代表pred的状态是SIGNAL了,如果当前节点的下一个节点不是CANCELLED,那么就将pred节点的next指针设为当前节点的next
//这样,链表就删除掉了当前节点和当前节点之前的那些CANCELLED状态节点,
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//否则唤醒继任节点 unparkSuccessor这个函数我们会在unlock时分析,目前只需要记住他是用来唤醒继任节点的。
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
//这里做个总结,这个函数其实就是在做一件事:整理CLH队列,剔除无意义节点。当然它只是剔除了自己以及自己之前的那些连续的无意义节点,及时的剔除无意义节点,有利于提高CLH队列的遍历效率
selfInterrupt源码如下:
//复原线程的中断标志
//只有线程中断过才会进这个函数 之前的parkAndCheckInterrupt()函数清空了线程的中断位
//这里相当于重新赋值线程中断位 之前清空的现在赋值回去
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
看完了公平锁整个加锁逻辑,我们再来看下非公平锁:
非公平锁的加锁只有两处代码不同,第一处是lock方法:
static final class NonfairSync extends Sync {
//...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//...
}
在公平锁中,lock方法会直接调用acquire,而acquire会调用tryAcquire方法,这个方法在获得锁的时候会判断当前CLH队列是否为空或者当前节点是否为头节点,换句话说,公平锁严格要求获取锁的节点是队列的头节点,别的节点不能获得锁。但非公平锁不同,非公平锁在lock的时候会直接立马尝试获得锁,要是能立马得到锁成功就返回,否则才会执行acquire
,另外两者的tryAcquire方法也不同:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
不同点只有一处,非公平锁没有执行hasQueuedPredecessors
函数,而这个函数就是保证公平的核心函数。
后面所有的内容均与公平锁一致,比如入CLH队列的末尾节点,自旋获得锁,挂起等待等。
可以看见,公平锁与非公平锁主要区别就是在获取锁的时候,非公平锁可以先尝试着直接获取,不等其他任何节点,如果要是获取锁失败,依然会乖乖的将自己置为等待队列的尾节点等着获取锁。公平锁比较守规矩,每次获取锁的时候只有自己是队列的第一个时才会尝试获取锁。非公平锁有可能提高并发的性能,这是因为通常情况下挂起的线程重新开始与它真正开始运行,二者之间会产生严重的延时。因此非公平锁就可以利用这段时间完成操作。这是非公平锁在某些时候比公平锁性能要好的原因之一。
3.2.2 unlock
与lock一样,当我们调用lock.unlock时,实际调的是 sync.release()
public void unlock() {
sync.release(1);
}
其核心实现为AQS的release方法:
//锁释放的核心方法,其步骤就两步:第一步是修改锁的状态位,第二步是唤醒等待的下一个线程
public final boolean release(int arg) {
//尝试释放锁,由子类实现,其本质是修改状态位state
if (tryRelease(arg)) {
//如果锁释放成功并且当前节点的状态不等于0(其实就是<0,代表还有后继节点需要唤醒),那么就直接线程的唤醒操作
Node h = head;
if (h != null && h.waitStatus != 0)
//线程的唤醒操作
unparkSuccessor(h);
return true;
}
//否则,也即尝试释放锁失败直接返回false
return false;
}
//子类Sync实现,尝试释放锁,本质就是在修改state值,将state-1
protected final boolean tryRelease(int releases) {
//由于传入的releases == 1,因此这里就是state-1
int c = getState() - releases;
//如果释放锁的线程不是当前持有锁的线程直接抛错
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//c代表的是当前持有锁的数量(可重入的时候每次重入也会+1),如果c==0其实就是代表当前持有锁的线程是0个,那么就代表当前锁已经被释放掉了,那么将exclusiveOwnerThread清空并返回true。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//线程的唤醒,非常非常非常核心的代码,本质就是从CLH队列中找到下一个有效头节点唤醒它
private void unparkSuccessor(Node node) {
//将当前头节点的状态进行修改,设为0。
//这里的意义我也没看懂,可能是怕多线程同时执行release方法,这样CAS操作设为0后就避免别的线程也操作后续代码
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//这是核心逻辑代码,这里的目的就是从CLH队列中找出新的有意义头节点(会有些没意义的节点,比如状态为CANCEL的节点)
Node s = node.next;
//这里的逻辑核心是指找到最接近node(此时node是头节点)的有意义节点
//找的方法是倒着找,从尾巴开始找,一旦找到有意义的就赋给s,然后再往前找,越来越逼近node,最后s就是最逼近node的有意义节点
//我们需要先理解变量s现在等于node.next,由于node已经执行完释放锁了,那么node也就没了意义,此时node.next就是这个CLH的头节点,但这个头节点有没有意义呢?如果s==null或者s.waitStatus>0,此时s就是无意义节点,就需要寻找真正的有意义节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//找到头节点后,唤醒头节点
if (s != null)
LockSupport.unpark(s.thread);
}
这里可能需要解释的为:
tryRelease
中的if(c==0)
,这里可能很多人会有疑问,既然是独占锁,那么应该只有一个线程会持有锁,c应该只等于1,那么c-1不一定等于0吗,为什么还判断c==0。这个原因很简单,就是可重入,在可重入的时候c也会增加。如果可重入的锁释放了,并不代表没有线程占用锁了,可能这个线程还在占用,没完全释放锁。
看完了ReentrantLock的加锁与释放锁源码,我们再举个例子说明一下多线程下使用独占锁时线程的生命周期:
假设你是一个线程小Y,你需要执行一段代码,但这段代码被上了锁,每个线程都必须拿到锁才能执行。也即你需要先执行
lock.lock()
这代表申请锁,当你执行后,会先判断当前锁有没有被别人持有(也即这段代码有没有别的线程在运行)。很不幸,你没能获得锁,那按照规则,你被包装成了一个节点,加进了等待队列,由于你是后来的你进了队列的最尾端。这一进队列不要紧,你发现原来队列里有那么多跟你一模一样的兄弟,它们也都在排着队等待获得锁然后执行代码。
现在队列中正在获得锁执行的是线程A。
由于你没有获得锁,也不是当前的头节点,很快有一个凶神恶煞的管理员过来跟你说:“小子,赶紧给我睡会,别在这不干活又消耗CPU的”,你也不敢多言,只好准备睡了。只不过在睡之前,你发现你前面的兄弟早就已经熟睡了,并且并不知道什么时候,你前面兄弟的节点状态由0改为了-1。
注 就是对应这段代码:
for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; }
你也不知道睡了多久,总之当你醒来的时候,现在队列已经是这样了:
你睡眼朦胧的问你的前置节点X大哥,是它把你唤醒的吗,X回答是的,并说道:现在到你了小Y,你现在是新的头节点了,你可以获得锁执行代码了。
由于从刚才睡着的位置被唤醒,因此你还是在执行
for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; }
不同的是,你现在是新的头节点了,此时再一次for循环的时候,你会进入第一个if分支,你将自己设为头节点代表获得了锁,然后开始噼里啪啦的执行加锁的那段代码。
当你执行完代码后,紧接着执行一个叫
lock.unlock()
的代码,有人告诉你,这是代表你现在执行完了,可以释放锁了,然后你要像X大哥那样,也唤醒你的后继节点,让它持有锁来执行代码吧。你如是照做,释放了锁,并执行
unparkSuccessor
函数,唤醒了你的后继节点Z。此时Z睡眼惺忪的问你:是你把我唤醒的吗Y大哥。你摸摸它的头,觉得这个场景似曾相识,然后回答道:是的,现在到你了小Z,你现在是新的头节点了,你可以获得锁执行代码了。
3.3 Condition
很多时候我们的线程同步关系不仅是锁竞争那么简单,有时还附带条件。
比如现在有一个生产线程和一个消费线程,生产线程会生产资源到资源池,消费线程也会从资源池中消费资源。但资源池是有一定大小限制的。当资源池满了以后,生产线程就需要停止生产,而一旦消费线程消费掉资源导致资源池又有空位后,就可以唤醒生产线程继续生产。同样的道理,当资源池没资源可消费时,消费者线程就需要停止消费,而一旦生产者线程又生产新资源后就可以唤醒消费者线程继续消费。
可以看到,上述案例存在两个线程间在一定条件的同步操作,这时就可以使用我们的Condition
,Condition就是条件变量。
public class BreadDemo {
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
private Object[] breadHouse;
private int breadHouseCapacity;
private int head = 0;
private int tail = 0;
private int count = 0;
public BreadDemo() {
this(10);
}
public BreadDemo(int breadHouseCapacity) {
this.breadHouseCapacity = breadHouseCapacity;
this.breadHouse = new Object[breadHouseCapacity];
}
public Object consumer() throws InterruptedException {
lock.lock();
try {
while (count==0){
notEmpty.await();
}
Object bread = breadHouse[head];
breadHouse[head] = null; //help GC
if(++head==breadHouseCapacity){
head=0; //回到数组起始 循环使用
}
--count;
System.out.println(Thread.currentThread().getName()+" 消费了一个面包,目前剩余:"+count);
notFull.signalAll();
return bread;
}finally {
lock.unlock();
}
}
public void product(Object bread) throws InterruptedException{
lock.lock();
try {
while (count==breadHouseCapacity){
notFull.await();
}
breadHouse[tail] = bread;
if(++tail==breadHouseCapacity){
tail = 0;
}
++count;
System.out.println(Thread.currentThread().getName()+" 生产了一个面包,目前剩余:"+count);
notEmpty.signalAll();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
BreadDemo breadDemo = new BreadDemo();
Thread[] threads = new Thread[100];
for(int i = 0; i < 50; i++){
threads[i] = new Thread(()->{
try {
breadDemo.consumer();
breadDemo.consumer();
Thread.sleep(100);
breadDemo.consumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
for(int i = 50; i < 100; i++){
threads[i] = new Thread(()->{
try {
Thread.sleep(100);
breadDemo.product(new Object());
Thread.sleep(100);
breadDemo.product(new Object());
Thread.sleep(100);
breadDemo.product(new Object());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
for(Thread thread:threads){
thread.start();
}
for(Thread thread:threads){
thread.join();
}
System.out.println("finish");
}
}
如上所示,我们使用ReentrantLock和Condition创建了一个面包工厂,并同时让50个线程生产面包和50个线程消费面包。
我们通过Lock创建了两个Condition,一个是notFull
条件,一个是notEmpty
条件。一旦面包房面包满了后,notFull
条件就不满足,因此需要阻塞notFull.await()
。而一旦消费一个面包后,当前面包房又是notFull
状态了,因此通过notFull.signal()
唤醒因为notFull
条件被阻塞的生产者。
当我们调用lock.newCondition()
时,其实是创建了一个ConditionObject
对象,ConditionObject
是AQS
类的内部类。
其实ConditionObject
也可以算得上是一个小的AQS,它里面也维护了一个队列:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
如上是ConditionObject
的属性,它里面也维持了一个链表,这个链表不同于CLH队列,这个链表只维护由于条件变量不满足被挂起的线程。我们不妨将这个队列称为条件队列。
3.3.1 await
public final void await() throws InterruptedException {
//判断线程中断
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程以尾节点的形式加入到条件队列
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//休眠挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//处理中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//醒来后重新获得锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理队列
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//处理中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//这段代码是在将当前线程封装为Node节点并以尾节点的形式加入到条件队列
private Node addConditionWaiter() {
Node t = lastWaiter;
//如果尾节点不为空,且状态不是CONDITION(加入到条件队列的节点状态应该都是CONDITION,不是的代表线程被取消了)
if (t != null && t.waitStatus != Node.CONDITION) {
//执行条件队列的清理工作(剔除无意义节点)
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前节点以状态为CONDITION的形式加入到队列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果尾节点为null,则说明队列没东西,因此头尾节点都是当前节点,条件队列是没有傀儡节点的
if (t == null)
firstWaiter = node;
else
//当前队列为尾节点
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//清理工作,剔除整个队列中无意义的节点,剔除的方法是遍历链表
//核心是单链表的三指针算法,t指针指向当前节点,trail指针指向当前节点的前置节点,而next指针指向当前节点的后置节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
//trail是当前节点的前置节点
Node trail = null;
//从头开始遍历
while (t != null) {
Node next = t.nextWaiter;
//当前节点被取消,则从队列中删除当前节点
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
//fullyRelease是AQS的方法,用于完全的释放锁(比如对于可重入锁,就得多释放几次)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
这里需要重点解释下fullyRelease()
,首先我们得明白,会执行condition.await()
方法的线程都是已经获得了锁的,因为Condition
与Lock
之间的配合关系是:
lock.lock();
try{
//...
condition.await();
//...
}finally{
lock.unlock();
}
也即必须要先获得锁,那为何又要在条件不满足而阻塞挂起来的时候释放掉锁呢?
我们假设现在面包房里没面包了,一个消费线程获得锁想消费面包,但由于没有面包,因此await()
挂起,此时如果不释掉锁,其他任何线程都是无法进入临界区代码执行的,包括生产线程(生产线程和消费线程是同一个锁),这样也就造成了死锁。
因此如果条件不满足,就需要释放锁。同样的,如果某个时刻条件满足了,线程由于signal()
被唤醒了,那是继续执行吗,还是先获得锁再执行,答案是先获得锁再执行。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
由于我们传入的参数Node的状态就是CONDITION
,因此直接返回false,那么也就代表当前线程需要挂起休眠。
唤醒后,线程会执行acquireQueued(node, savedState)
来重新获得锁(考虑了可重入,因此获得锁的次数与释放次数相等)。最后还顺带清理了一下条件队列,剔除了里面被取消的节点。
3.3.2 signal
public final void signal() {
//判断下当前调用signal的线程是不是持有锁的线程,如果不是抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//拿到队列的第一个节点,准备唤醒
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//这是在从队列中删除第一个节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//这里采用do while,如果设置失败,代表当前节点被取消了,就换成下一个节点继续
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//将状态由CONDITION置为0,如果设置失败则代表当前节点已经被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//上面其实就是让这个节点从条件队列里删除,然后重新加入到CLH队列,重新获得锁运行
//其中p是当前节点在CLH队列中的前置节点
Node p = enq(node);
int ws = p.waitStatus;
//如果当前节点的前置节点状态<=0(未被取消),则将其设置为-1,代表后续有需要被唤醒的节点
//否则 否则代表的是前置节点状态 >0 或者设置-1失败,此时直接唤醒当前线程(这代表CLH队列中当前节点已经是实际头节点了)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signalAll()
与doSignal()
类似,只不过会尝试将所有的条件节点从条件队列中删除移到CLH队列里:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
//遍历所有的节点做transferForSignal操作
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
可以看到所谓await()
就是将节点加入条件队列并释放锁然后挂起。而所谓signal()
就是将节点从条件队列里移除,重新加入到CLH队列获取锁。
当获取锁成功后,线程被唤醒,此时我们仍然需要重新判断当前是否满足条件,也即:
while (count==0){
notEmpty.await();
}
这是因为假设我们生产了一个面包但用了signalAll()
,此时所有消费者线程都被移到CLH队列争抢锁,第一个抢到锁的会消费掉面包,后续抢到锁的由于面包没了还得挂起,因此我们需要while
的形式判断。不满足就挂起,醒来后继续判断。
3.4 CountDownLatch
ReentrantLock是一个典型的独占锁,而CountDownLatch是共享锁的思想。独占锁和共享锁其实都可以做到线程间的同步机制。CountDownLatch核心功能也是同步多个线程。
假设我们需要等待某个(或多个)条件成熟,在这个条件(或所有条件)未成熟前,所有依赖于这个条件的线程都需要阻塞,一旦条件成熟,这些挂起的线程就都可以唤醒执行。
CountDownLatch翻译为闭锁。一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门打开所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。
//创建 CountDownLatch,下面代码的CountDownLatch均是这个创建的对象
CountDownLatch countDownLatch = new CountDownLatch(2);
//线程1.创建100个线程,挂起等待两个条件成熟,两个条件均成熟后才被唤醒执行
public void test(){
for(int i = 0; i < 100; i++){
Thread thread = new Thread(()->{
System.out.println("条件未成熟,线程被挂起阻塞");
countDownLatch.await();
System.out.println("条件成熟,线程被唤醒执行");
});
thread.start();
}
}
//线程2. 条件1成熟
public void notify1(){
countDownLatch.countDown();
}
//线程3. 条件2成熟
public void notify1(){
countDownLatch.countDown();
}
上述代码中,先创建了一个需要等待两个条件的CountDownLatch。
线程1创建了100个线程,这100个线程执行countDownLatch.await()
,代表会一直等待,等到countDownLatch的两个条件均成熟才继续执行。
线程2执行countDownLatch.countDown();
,表示一个条件成熟
线程3执行countDownLatch.countDown();
,表示另一个也成熟。
此时两个条件均成熟,挂起的100个线程均会被唤醒然后执行自己后续的代码。可以看到我们通过CountDownLatch实现了100个线程的同步,让100个线程均阻塞等待,直到两个条件成熟,一旦条件成熟,100个线程均会被唤醒(不像独占锁一个一个唤醒)。
下面我们看一下CountDownLatch是如何实现await()和countDown()的。
3.4.1 await
//当创建一个闭锁时,传入count,这个count值会传给AQS的state值
//AQS的state值就是我们之前说的锁的状态位,比如在独占锁中我们进行lock和unlock的时候也是在修改这个state值(+1,-1)
//在CountDownLatch中,count的数量就是需要等待条件的数量,只有所有条件均成熟,才能放行等待的线程。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
protected final void setState(int newState) {
state = newState;
}
//调用await操作本质是调用sync.acquireSharedInterruptibly(1);sync是CountDwonLatch内部类
//当前方法是先判断是否满足获得锁(其实没有锁的概念,就是看state是否等于0,如果等于0就代表无需等待,因为没有需要等待的条件,条件均成熟了),如果不满足就将自己挂起
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//判断自己是否满足获得锁,如果state==0代表等待条件全部满足 返回正值代表获取锁成功否则代表获取锁失败
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//走到这里必然代表获取锁失败(有未成熟的条件,需要挂起等待),所以同独占锁,将自己变为节点加入到CLH队列自旋挂起
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//addWaiter方法与独占锁代码相同,都是将当前线程变为节点加入到CLH队列的末尾
//只不过对于独占锁,是以Node.SHARED方式加入,而同步锁是null
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//与独占锁挂起相同,for循环不断尝试挂起
for (;;) {
//如果当前节点是真实头节点,那么调用tryAcquireShared尝试获取锁
final Node p = node.predecessor();
if (p == head) {
//tryAcquireShared会返回1或-1,上面我们已经看了它的源码,1代表获取锁成功(无等待条件),-1代表获取锁失败(有等待条件)
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果获取锁成功,将当前节点设为新的头结点并唤醒继任节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果当前线程不是头结点或者获取锁失败,判断当前线程是否应该挂起
//这段代码也许独占锁完全相同,就是清理CLH队列的节点,如果前面节点是SIGNAL状态,那么就将自己挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//这里也是一样的,如果线程中断就将这个节点CANCELLED
if (failed)
cancelAcquire(node);
}
}
//设置新的头结点 将当前节点设为头结点并且唤醒继任节点,走到这其实propagate必然等于1
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//获取继任节点,并判断如果是共享模式,就执行doReleaseShared唤醒继任节点。
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//唤醒头结点的继任节点
//这里的逻辑有些复杂,但核心就是走到unparkSuccessor(h)这句话
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒继任节点,这个函数我们在独占锁讲过
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
3.4.2 countDown
//countDownLatch的countDown
//本质就是将state状态位-1然后如果减为了0 就调用doReleaseShared,唤醒CLH队列头结点的继任节点
public final boolean releaseShared(int arg) {
//尝试将当前状态位-1 如果状态位归0返回true否则返回false
if (tryReleaseShared(arg)) {
//尝试释放当前头结点的继任节点
doReleaseShared();
return true;
}
return false;
}
//将state状态位-1,如果state==0就返回true,否则返回false
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//通过CAS操作将state-1
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
现在我们就看完了CountDownLatch的源码,那么我们可以对比一下独占锁和共享锁:
首先在ReentrantLock中,锁的状态位state代表当前持有锁的线程数量,但在CountDownLatch中代表需要等待的条件的数量。
其次独占锁的逻辑是这样的:
lock.lock();
dosomething();
lock.unlock();
因此当独占锁获得锁后会先执行自己的业务代码,在执行完业务代码后再释放锁唤醒下一个线程。所以这个流程就是:获得锁->执行业务代码->释放锁唤醒下一个线程->获得锁->执行业务代码->释放锁唤醒下一个线程-> ......
但共享锁完全不同,共享锁在获得锁后,会先唤醒下一个线程再执行自己的代码,也即:
获得锁->唤醒下一个线程后再执行自己的业务代码->获得锁->唤醒下一个线程后再执行自己的业务代码->获得锁->唤醒下一个线程后再执行自己的业务代码-> ......
这样一旦头节点获得了锁,大家的所有逻辑都是先唤醒继任节点再干自己的事,因此CLH队列内的所有阻塞线程会在很快的时间均被唤醒。这样就会出现一种一旦锁释放,所有线程都可以运行自己代码的现象,也就可以理解为所有线程共享一个锁。
独占锁和共享锁最大的区别就是唤醒继任节点的时间点。
3.5 CyclicBarrier
想象这样一个场景:
过年了,大家一起都等着吃年夜饭。现在规定饭没做好的时候大家都会等它做好再吃,如果已经做好了就可以直接吃。假设年夜饭会在六点钟做好,小孩5点就开始等了,5点半的时候女人也开始加入一起等的人里面,一直到六点,这时老人也来了,饭正好也做好,大家不用等其他人,饭做好了就直接吃。
上面这个场景我们可以很容易的使用一个CountDownLatch模拟出。CountDownLatch只需要一个条件,就是年夜饭做好。每个小孩,老人或女人都是一个单独的线程,小孩在5点的时候await(),女人在5点半的时候await(),老人在6点的时候await()。CountDownLatch在6点的时候执行countDown()此时所有被等待的人均可以开始吃饭了。
但在七点的时候男人回来了,男人回来后提出了一些新的规矩:
男人觉得年夜饭大家应该一起开动一起吃,怎么能不等他们就直接开吃呢?因此男人规定,不管年夜饭什么时候做好,只要人凑齐了,大家就可以开始吃了。
现在难办了,因为CountDownLatch只会关心等待的条件有没有成熟,但同时有多少线程在等待(人有没有齐)并不关心。此时就是CyclicBarrier大展身手的时候了,Barrier翻译栅栏或屏障,就好像前面有一个屏障,只有大家都到齐了,才能手拉手一起过去,谁走的快一点谁就在屏障点那等着。上面吃饭的场景代码如下:
/**
* @author coderZoe
* @date 2022/6/23 13:26
*/
public class NewYearDinner {
private final CyclicBarrier cyclicBarrier;
private final int count;
public NewYearDinner(int count) {
this.count = count;
this.cyclicBarrier = new CyclicBarrier(count);
}
public void readyToEat(){
new Thread(() ->{
try {
System.out.println("准备吃饭");
cyclicBarrier.await();
System.out.println("太好了,大家都准备好了,可以吃饭了");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
//测试方法
public static void main(String[] args) throws InterruptedException {
int personCount = 10;
NewYearDinner newYearDinner = new NewYearDinner(personCount);
for(int i = 0; i < personCount; i++){
newYearDinner.readyToEat();
Thread.sleep(1000);
}
}
}
CyclicBarrier在构建的时候需要传参count
,这代表需要等待的线程数,只有一共count个线程在等待时,才代表到齐了,可以放行了。
我们之前在CountDownLatch时讲过:
一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。
CyclicBarrier中有一个单词是Cyclic,翻译为循环或周期的,这代表CyclicBarrier可以循环的时候,不仅只能用一次,这也是与CountDownLatch的不同。
还以上面的吃年夜饭为例:
现在大家都已经开始吃年夜饭了,小孩子一般吃的快一些而男人与老人一般吃的慢一些,吃完饭后往往要放烟花,自然而然的,放烟花也是要等大家一起的,先吃完的必须等待还没吃完的,等到大家都吃完了,才能放烟花。
可以看到放烟花也是一个CyclicBarrier的使用场景,我们其实可以复用之前的CyclicBarrier,其代码如下:
package cn.com.coderZoe.Module10JUC.cyclicbarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author coderZoe
* @date 2022/6/23 13:26
*/
public class NewYearDinner {
private final CyclicBarrier cyclicBarrier;
private final int count;
public NewYearDinner(int count) {
this.count = count;
this.cyclicBarrier = new CyclicBarrier(count);
}
public void readyToEat(){
new Thread(() ->{
try {
System.out.println("准备吃饭");
cyclicBarrier.await();
System.out.println("太好了,大家都准备好了,可以吃饭了");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
public void readyToFireworks(){
new Thread(() ->{
try {
System.out.println("准备放烟花");
cyclicBarrier.await();
System.out.println("太好了,大家都准备好了,可以放烟花了");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
//测试函数
public static void main(String[] args) throws InterruptedException {
int personCount = 10;
NewYearDinner newYearDinner = new NewYearDinner(personCount);
for(int i = 0; i < personCount; i++){
newYearDinner.readyToEat();
Thread.sleep(1000);
}
for(int i = 0; i < personCount; i++){
newYearDinner.readyToFireworks();
Thread.sleep(1000);
}
}
}
可以看到我们使用了同一个CyclicBarrier。
在讲CyclicBarrier#await()
实现之前,我们先来看下CyclicBarrier的部分源码实现:
CyclicBarrier的属性:
//独占锁
private final ReentrantLock lock = new ReentrantLock();
//条件变量 用于线程间的同步
private final Condition trip = lock.newCondition();
//需要彼此等待的线程的个数
private final int parties;
//在一组任务执行完,就会执行这个runnable任务
private final Runnable barrierCommand;
//代; 一组任务代表一代或者说一次CyclicBarrier的使用代表一代
private Generation generation = new Generation();
//当前还需要等待的线程数 如果为0就代表无需等待,所有人都完成了。
private int count;
CyclicBarrier的构造方法
//构造方法,parties代表要彼此等待的线程数,同时支持传入一个runnable方法,在最后一个线程到达屏障点后会执行这个方法,如果为空则不执行
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
通过上面的部分源码和例子,我们可以总结CyclicBarrier特性如下:
- await()方法将挂起线程,直到同组的其它线程执行完毕才能继续
- await()方法有一个返回值,会返回一组内还剩多少线程未到屏障点
- 可循环使用
- CyclicBarrier 的构造函数允许携带一个任务,这个任务将在0%屏障点执行,它将在await()==0后执行。
3.5.1 await
await
有几个重载方法,其本质都是会调用到dowait()
方法,可以看到dowait()
才是核心方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
//实际执行线程阻塞和挂起的核心方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,TimeoutException {
//整个dowait操作都是线程安全的,ReentrantLock将整个函数包了起来
//原因是组内的线程调用await肯定是并发调用的,为了保证对CyclicBarrier内数据的访问和修改是线程安全的,所有加独占锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当前的代或者当前组
final Generation g = generation;
//异常判断
if (g.broken)
throw new BrokenBarrierException();
//如果当前调用线程中断,直接中断整个这一组的CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//因为调用dowait时必然是指到了屏障点,此时将count-1
int index = --count;
//如果是0代表所有一组内的所有线程都到了屏障点
if (index == 0) { // tripped
boolean ranAction = false;
try {
//因为count现在等于0,那么代表当前正在执行await的线程是到达屏障点的最后一个线程
//最后一个线程需要执行一开始构造方法放进来的barrierAction,
//就是我们刚才说的每组任务执行完会由最后一个线程执行barrierAction,对应上面特性4
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//因为当前组都执行完了,所以需要唤醒其他线程,并且复位CyclicBarrier供下一次使用。
nextGeneration();
return 0;
} finally {
//这里是判断执行barrierAction失败了,如果执行失败也会终止当前一组的CyclicBarrier
if (!ranAction)
breakBarrier();
}
}
//会走到这代表当前await线程不是最后一个到达屏障点的线程,他就需要挂起等待。
for (;;) {
try {
//如果不关心超时,直接执行条件变量的await将自己挂起
if (!timed)
trip.await();
//否则就使用条件变量的awaitNanos判断是否超时
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//要是线程被中断了(或超时了,其实也是被中断),就中断掉当前barrier,即结束本组。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
//异常处理 如果本轮被挂掉了,就抛出异常
if (g.broken)
throw new BrokenBarrierException();
//会走到这代表正常被唤醒了(被最后一个线程全唤醒了,g!=generation的原因是因为最后一个线程new了一个新的Generation)
//返回自己的序号就可以了
if (g != generation)
return index;
//如果超时就抛出超时异常并终止当前一组的CyclicBarrier
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
//当最后一个线程到达屏障点时会调用这个函数
private void nextGeneration() {
//通知其他线程 放行大家
trip.signalAll();
//复位
count = parties;
generation = new Generation();
}
//中断和复位
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
可以看到CyclicBarrier对于多线程的同步是通过Lock与Condition这一组合实现的。
3.6 Semaphore
Semaphore也即信号量,很多同学可能在学习操作系统的时候学习过它,如果没学过也没关系。Semaphore最多的使用场景就是池化,比如我们熟知的线程池,连接池都是用到了池化技术。
Semaphore 是一个计数信号量。从概念上讲,信号量维护了一个许可集。在许可可用前会阻塞每一个 acquire()
的线程。每个 release()
添加一个许可,从而可能释放一个正在阻塞的获取线程。但是,Semaphore不使用实际的许可对象, 只对可用许可的号码进行计数,并采取相应的行动。
也即Semaphore是一个计数器,每有一个请求计数器会减1,每次释放计数器就会+1,在计数器大于0时允许线程通过,在计数器等于0时会阻塞线程,即使已经获得许可的线程也会阻塞,因此Semaphore是不可重入的。
我们可以通过Semaphore实现一个对象池:
//对象池
public class ObjectPool <T>{
public interface ObjectFactory<T>{
T makeObject();
}
class Node{
T obj;
Node next;
}
private final int capacity;
private final ObjectFactory<T> factory;
private final Lock lock = new ReentrantLock();
private final Semaphore semaphore;
private Node head;
private Node tail;
public ObjectPool(int capacity, ObjectFactory<T> factory) {
this.capacity = capacity;
this.factory = factory;
this.semaphore = new Semaphore(this.capacity);
this.head = null;
this.tail = null;
}
//从池内获取一个资源
public T getObject() throws InterruptedException {
semaphore.acquire();
return getNextObject();
}
//将一个资源返回给资源池
public void returnObj(T t){
returnObjToPool(t);
semaphore.release();
}
private T getNextObject(){
lock.lock();
try {
if(head==null){
return factory.makeObject();
}else {
Node ret = head;
head = head.next;
if(head==null){
tail=null;
}
ret.next = null;
return ret.obj;
}
}finally {
lock.unlock();
}
}
private void returnObjToPool(T t){
lock.lock();
try {
Node node = new Node();
node.obj = t;
if(tail==null){
head = tail = node;
}else {
tail.next = node;
tail = node;
}
}finally {
lock.unlock();
}
}
}
上面的对象池逻辑比较简单,首先对象池内部维护了一个链表,链表代表了对象池,在构造方法时传入对象池的大小,并构造Semaphore。其次通过Semaphore来控制资源的获取数量,只有当前被获取的资源小于capacity,才能进入getNextObject()
获取资源,对象的获取就是返回对象池的头结点。当对象返回给对象池时就是执行将节点加入对象池的尾端并执行semaphore.release(),当对象池的对象用完后再请求会被阻塞直到有新的对象被返回回来。
3.6.1 信号量与独占锁
想一下如果我们将Semaphore的初始化大小设为1,即只允许有一个可用的许可,如果现在有人在用,那必须等到它返回其他人才能用。这样看信号量起到了一个互斥锁的作用。这种信号量我们称为二进制信号量(二值信号量),因为只有两个状态可用和不可用。所以与锁相似,信号量也存在公平信号量和非公平信号量,对于公平信号量就是说一定会按请求的顺序获取锁而非公平信号量就是先获取如果获取不得再加入队列等待。
3.6.2 信号量与条件变量
信号量其实还可以作为条件变量使用,我们假设现在有两个线程A和B,A调用信号量的acquire()
希望能够阻塞挂起,然后B调用信号量的release()
将挂起的A唤醒,这就类似于A线程在等待条件B。在这种情况下,信号量的初始值需要赋为0,这似乎很奇怪,一个可用资源是0的信号量。其实只有这样,当A执行acquire()
才会被挂起,此时信号量-1,然后B调用release()
,此时信号量+1,线程A被唤醒。
注:在JUC下无法通过信号量为0实现条件变量,因为JUC下的信号量在唤醒线程的时候做了是否有可用资源的判断。之所以加进来信号量与条件变量是因为笔者在阅读《操作系统导论》时书中提及的知识,笔者认为可能是不同语言下信号量的实现不同,而且信号量为0这一思路也很新奇,因此加进了笔记。
3.6.3 acquire
//当调用这里的时候会走到Sync(内部类)的acquireSharedInterruptibly
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//这里的意思是说线程会优先尝试获取锁,如果获取成功(计数器大于等于0,即存在可用资源)就直接返回,否则阻塞挂起线程。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquireShared会返回当前池内可用资源,如果小于0说明池内无可用资源
if (tryAcquireShared(arg) < 0)
//如果池内无可用资源,就将线程挂起,否则直接返回
doAcquireSharedInterruptibly(arg);
}
//对于公平锁和非公平锁其区别就在tryAcquireShared这个函数
//与ReentrantLock的相同信号量的公平锁和非公平锁也是在获取锁的一瞬间有区别,非公平锁会先尝试获取,获取成功就直接用,不成功再入队列,公平锁会先判断自己是不是头节点
//公平锁
protected int tryAcquireShared(int acquires) {
for (;;) {
//与Reentrant公平锁内的实现相似
//如果不是头结点(当前节点有前置节点) 直接返回-1
if (hasQueuedPredecessors())
return -1;
//走到这代表当前节点有可能是头节点,具有获得锁的权力
//获得state,将state减1,如果剩余小于0则直接返回,否则将state的状态设为减去1后的值(CAS加for)
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//非公平锁
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
//直接获取,如果获取失败(剩余小于0)就返回,否则就不断获取直到通过CAS操作将state设为减1后的值成功返回
//也即与公平锁的区别是不需要考虑当前节点是否是头结点
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
上面需要解释的是for(;;)
,之所以加for循环是因为如果当前remaining不小于0且CAS操作失败,那么可以继续重新执行。
doAcquireSharedInterruptibly()
函数源码在CountDownLatch时已经介绍过。
3.6.4 release
//释放资源
public void release() {
sync.releaseShared(1);
}
//尝试释放锁,释放成功后唤醒头结点的继任节点
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//这里说过很多次了,共享模式下的继任节点唤醒
doReleaseShared();
return true;
}
return false;
}
//就是简单的将sate+1然后通过CAS操作设置state的值
protected final boolean tryReleaseShared(int releases) {
//为避免CAS操作失败,for循环不断尝试
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared
在CountDownLatch中讲过。
3.6.5 一些总结
Semphore的源码已经看完了,简单来讲还是内部维持的一个state,每次请求资源的时候就执行state-1操作,当state-1小于0就挂起线程,当state-1大于等于0就将state设为sate-1,然后返回。当资源释放的时候就执行state+1操作并唤醒继任节点。
还回到锁本身讨论,这些锁或锁的应用类本质都是维护了state值,这个值得是线程安全的,所以通过CAS操作这个值。
对于ReentrantLock,这个state代表的是持有线程的锁的个数,如果state大于0并且不是正在拥有锁的线程就挂起,当unlock时state-1,并唤醒阻塞线程(如果state==0才唤醒)。对于CountDownLatch和CyclicBarrier这个state代表的是剩余条件个数,state的值大小是初始化时指定的。每有一个条件完成state就-1,当减为0代表条件成熟唤醒所有阻塞线程。对于Semaphore这个state代表的是资源可用数,也是在初始化时指定。每次资源请求时state会-1,当资源减为0还在请求时就会被挂起等待,而每次资源返回时state就会+1,并且唤醒等待的节点。
对于线程的唤醒,ReentrantLock、CountDownLatch、CyclicBarrier和Semaphore本质都是相同的,就是一旦满足条件就唤醒CLH队列的首个线程,不同之处在于唤醒的时机。
ReentrantLock是在执行完临界代码,然后unlock的时候才唤醒,即临界代码执行完才会释放锁,符合独占的思想。而对于CountDownLatch或Semaphore,会在线程一旦获得锁后也唤醒继任节点,符合共享的思想(一人得到锁,通知其他人也试着去得到)。
举个例子,如果是ReentrantLock,一个线程获得锁后会先执行自己的代码,执行完后会手动释放锁,释放锁时唤醒继任节点,继任节点也重复上述操作,每个时间只有一个线程拥有锁。而对于CountDownLatch,一个线程获得锁后代表条件成熟,他会先唤醒其他线程再执行自己的操作,当继任线程被唤醒,也会尝试获得锁,自然也可以得到锁,那么一个传一个,CLH队列内的所有节点都会被唤醒,大家醒来都是先唤醒别人再干自己的事,所以CountDownLatch是一醒都醒。Semaphore有一点不同,他也是共享锁,一个线程获得锁后会先唤醒继任节点再做其他事,但不同于CountDownLatch所有继任节点都可以获得锁,Semaphore的继任节点可能不一定能获得锁,这时这个不能获得锁的节点的后续节点均不能被唤醒了,即Semaphore不能做到一醒都醒,只能是一醒,在一定范围内的节点都醒(醒来的节点数等于可获取资源数,非公平锁不算)。
3.7 ReadWriteLock
读写锁是一种使用非常频繁的锁,比如MySQL事务的串行化隔离级别就是通过读写锁实现的。读写锁的思想很简单,读锁是一个共享锁,也即读操作可以多线程同时进行。写锁是一个独占锁,写操作需要串行的进行。
JUC中读写锁的接口如下:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
同时其具体实现为ReentrantReadWriteLock
,我们以一个例子来说明读写锁的使用方法:
public class SimpleConcurrentMap<K,V> implements Map<K,V> {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
private final Map<K,V> map;
public SimpleConcurrentMap(Map<K, V> map) {
this.map = map;
}
@Override
public int size() {
readLock.lock();
try {
return map.size();
}finally {
readLock.unlock();
}
}
@Override
public boolean isEmpty() {
readLock.lock();
try {
return map.isEmpty();
}finally {
readLock.unlock();
}
}
@Override
public boolean containsKey(Object key) {
readLock.lock();
try {
return map.containsKey(key);
}finally {
readLock.unlock();
}
}
@Override
public boolean containsValue(Object value) {
readLock.lock();
try {
return map.containsValue(value);
}finally {
readLock.unlock();
}
}
@Override
public V get(Object key) {
readLock.lock();
try {
return map.get(key);
}finally {
readLock.unlock();
}
}
@Override
public V put(K key, V value) {
writeLock.lock();
try {
return map.put(key,value);
}finally {
writeLock.unlock();
}
}
@Override
public V remove(Object key) {
writeLock.lock();
try {
return map.remove(key);
}finally {
writeLock.unlock();
}
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
writeLock.lock();
try {
map.putAll(m);
}finally {
writeLock.unlock();
}
}
@Override
public void clear() {
writeLock.lock();
try {
map.clear();
}finally {
writeLock.unlock();
}
}
@Override
public Set<K> keySet() {
readLock.lock();
try {
return map.keySet();
}finally {
readLock.unlock();
}
}
@Override
public Collection<V> values() {
readLock.lock();
try {
return map.values();
}finally {
readLock.unlock();
}
}
@Override
public Set<Entry<K, V>> entrySet() {
readLock.lock();
try {
return map.entrySet();
}finally {
readLock.unlock();
}
}
}
利用读写锁我们实现了一个简单的并发Map,可以看到这里的使用很简单,就是要读的时候加读锁,要写的时候加写锁。ReadWriteLock需要严格区分读写操作,如果读操作使用了写入锁,那么降低读操作的吞吐量,如果写操作使用了读取锁,那么就可能发生数据错误。
接下来我们看下读写锁的实现:
首先读写锁内部属性如下:
//读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
//写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
//锁
final Sync sync;
ReadLock和WriteLock是ReentrantReadWriteLock的两个内部类,我们在执行lock.readLock()
和lock.writeLock()
的时候就是在获取readerLock和writerLock:
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
Sync也是ReentrantReadWriteLock的内部类,其底下有两个具体实现类FairSync和NonfairSync(这与ReentrantLock和Semaphore等相似)。
读锁和写锁内部都有一个Sync属性:
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
而Sync是ReentrantReadWriteLock的内部抽象类,其有两个实现类是公平锁和非公平锁也是ReentrantReadWriteLock的内部类(这与ReentrantLock一模一样)。Sync继承自AQS,即从这里我们可以看出,Sync(及其实现类)才是那个实际能做锁操作的对象。
ReentrantReadWriteLock
的构造方法如下,默认构造方法是非公平锁(与ReentrantLock一模一样)
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
这里有个非常关键的信息,在构造ReadLock和WriteLock时,我们传入的是同一个对象(this),那么也就代表writerLock和readerLock对象持有的sync属性是同一个对象,我们之前说过sync才是那个根本的锁,既然写锁是独占锁,读锁是共享锁,那为什么可以持有同一个锁对象呢?也即为什么读写锁是同一把锁呢?
之前的学习中,我们知道锁判断的本质就是AQS里的state标志位,state标志位标识了当前线程使用锁的情况,但现在我们将一个锁劈成了读锁和写锁两个锁来用,读线程和写线程肯定有不同的判断逻辑,那我们要如何记录呢?ReentrantReadWriteLock的实现方式是将state拆为高16位字节和低16位字节(state是个32字节的int变量),高16位用来描述读锁,低16位用来描述写锁的情况,我们知道16位能表示的最大整数是65535。所以无论是读锁还是写锁,能描述的最大持有锁的线程只能是65535个(其实写锁影响不大,因为写锁是独占锁,除非不断的重入)。
了解了这个之后,我们再来看读写锁的加锁和锁释放源码:
3.7.1 writeLock.lock()
//写锁lock的时候,其会调用到AQS的acquire,本质就是一个独占锁或者说使用锁的独占模式
public void lock() {
sync.acquire(1);
}
//通过独占的形式先尝试获得锁,获得不了就排队等待
//这段代码与ReentrantLock的lock相同,只是tryAcquire的实现有些许区别
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//写锁的tryAcquire(或者理解为读写锁在独占模式下的tryAcquire)
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
//这里是做了位操作,获得C的低16位
int w = exclusiveCount(c);
//c是由读锁和写锁同时构成,c!=0有三种情况:
//1. 读锁不为0,当前有在读的线程
//2. 写锁不为0,当前有在写的线程
//3. 读写锁均不为0,当前有正在读和写的线程(肯定是同一个线程)
if (c != 0) {
//如果写锁为0,那么读锁肯定不为0,此时因为有在读的线程,所以无法写入
//这里我们可以看到锁是不支持升级的,也即当有线程在读取的时候是不能写入的。不能从读取锁升级为写入锁(无论正在读取的线程是不是当前线程)
//或者 首先会走到或者代表w!=0,也即当前有线程在写入,如果当前写入线程不是自己,也返回失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//走到这里代表w!=0,同时当前在写入的线程是自己
//这里是对可重入的判断,我们说过,16位最多表示65535,因此这是在判断独占锁可重入次数大于65535的错误
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//走到这里代表着:
//1.没有读线程只有写线程,且写线程是自己
//2.可重入的计数器没超过65535
//其实走到这里就是重入,而且一定是重入,因为w!=0
//更新c的值
setState(c + acquires);
return true;
}
//走到这代表c=0 因为c!=0的情况都判断完了。也即当前读锁和写锁都没人用
//判断当前线程是否需要阻塞,如果不需要阻塞就直接返回false代表获取锁失败
//如果不需要阻塞,将c值更新 如果更新失败返回false代表锁获取失败
//否则代表锁获取成功,将当前线程设为独占线程,返回成功
//这里可能有人有疑问,c都已经等于0了,那直接获得锁不就行了,咋还需要判断是不是需要挂起
//原因很简单,有可能其他线程一起进来竞争写锁,你刚才读到的c=0不代表现在还是
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
其中writerShouldBlock()
的判断就是公平锁 与非公平锁的区别:
//公平锁会执行hasQueuedPredecessors,如果当前节点是真实头结点就不需要挂起,否则就得挂起
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
//非公平锁获取锁的时候不需要考虑顺序,直接返回false直接获取就可以了。
final boolean writerShouldBlock() {
return false;
}
可以看到公平锁还是得先判断当前节点是队列头节点才允许获取锁,非公平锁直接获取。
acquireQueued()
与addWaiter()
、selfInterrupt()
上面均讲过(在ReentrantLock一节)。
3.7.2 writeLock.unlock()
//写锁释放的时候会调用AQS的release函数 与独占锁相同
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//尝试释放锁,如果释放成功就判断当前节点是否有后继节点需要唤醒,如果需要唤醒就唤醒后继节点
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
//如果释放锁的线程不是当前独占的线程就抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//将c值减1 由于写锁是低16字节,因此直接-1即可
int nextc = getState() - releases;
//通过位操作判断当前写锁是否等于0,等于0就代表当前线程执行完了,可以换下一个线程了
boolean free = exclusiveCount(nextc) == 0;
//如果写锁state==0代表当前写锁已经没有人占用,将独占的字段清空
if (free)
setExclusiveOwnerThread(null);
//同时更新state字段
setState(nextc);
//返回当前锁是否释放
return free;
}
后续锁释放流程与独占锁相同,不再赘述。
3.7.3 readLock.lock()
//调用共享锁的lock
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//尝试获取锁
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//如果写锁不等于0并且写锁的使用线程不是当前线程,那么直接返回失败
//如果是当前线程则代表写入的线程也可以同时执行获得读锁读取
//但是读取的时候是不能写入的
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//会走到这代表可以获得读锁 要么是写锁为空要么是写锁的独占线程就是当前线程
//获得读锁的值
int r = sharedCount(c);
//如果读锁不需要挂起并且当前读锁没超过65535并且更新读锁的数据成功就获得锁
//否则执行fullTryAcquireShared操作,不断循环获得锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//关于if里面的东西我们下面再单独说
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//fullTryAcquireShared函数我们下面分析
return fullTryAcquireShared(current);
}
通过之前的学习我们基本知道锁的状态位state其实就是一个计数器,在获得锁的时候计数器+1,释放锁的时候计数器-1。共享锁不同于独占锁,独占锁在获取锁的时候,会有个exclusiveOwnerThread变量标识当前持有锁的是哪个线程,这样当某个线程在执行lock.unlock()
时会先判断它是不是真的持有锁的那个线程,如果不是就抛出异常。但是共享锁不同,共享锁会有很多线程在持有锁,如果此时某个线程执行lock.unlock()
该如何记录这个线程是不是真的曾持有共享锁,以及如何修改计数器的值呢?因此我们需要一个信息,这个信息是记录当前线程是不是真的持有共享锁以及如果持有共享锁则持有多少个(可重入下每次重入都会+1),上面if语句的作用就是做这个的,要理解if语句内的内容,需要先看下Sync内部的属性
//当前线程持有的读锁的数量(可重入)
private transient ThreadLocalHoldCounter readHolds;
//保存最后一个获取读锁线程持有的读锁的数量(可重入)缓存的作用
private transient HoldCounter cachedHoldCounter;
//第一个获得读锁的线程(第一个获得读锁的线程)
private transient Thread firstReader = null;
//第一个获得读锁的线程持有的读锁的数量(可重入)
private transient int firstReaderHoldCount;
其中ThreadLocalHoldCounter如下:
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
可以看到ThreadLocalHoldCounter继承自ThreadLocal,而HoldCounter类如下:
static final class HoldCounter {
//持有的读锁数
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
HoldCounter内的count是一个计数器,计的是持有读锁的数量。ThreadLocalHoldCounter是HoldCounter和ThreadLocal的结合体,因此指的就是当前线程持有读锁的数量,知道ThreadLocal的同学可能会明白,ThreadLocal是线程安全的,ThreadLocal作用域是线程,这样每个线程都持有一份自己的HoldCounter(对ThreadLocal不了解的推荐一篇文章正确理解Thread Local的原理与适用场景)。
那么Sync内的readHolds属性就很好理解了,readHolds就是当前线程持有读锁的数量,每次加锁成功的时候count就会+1,释放锁的时候count就会-1。那么cachedHoldCounter、firstReader和firstReaderHoldCount是干什么的呢?
其实这些都是缓存,只有一个作用就是提升性能。这时我们就得问一下,提升了什么性能?怎么提升性能的?
刚才我们说到readHolds是当前线程持有的锁,但是ThreadLocal的底层实现是Map,也即每个线程通过readHolds拿自己的HoldCounter是通过Map的形式获取的,这样性能可能有点低。
与此同时,读锁的使用有一个比较普遍的场景:读锁的获取往往会紧跟着该读锁的释放。换句话说,读操作会很快很快,往往短时间发生的都是同一个线程的获取锁和释放锁操作。再换句话说,执行 unlock 的线程往往就是刚刚最后一次执行 lock 的线程,中间可能没有其他线程进行 lock。
cachedHoldCounter保存最后一个获取读锁的线程的HoldCounter,什么是最后一个获取读锁的线程?其实就是最新获得读锁的线程,也即cachedHoldCounter记录的是刚刚获取读锁的那个线程。上面说了,锁的获取和释放往往是紧接着发生的,既然锁获取的时候利用cachedHoldCounter记录了获取线程的HoldCounter,那释放的时候直接利用cachedHoldCounter来做count--不就好了,不需要通过ThreadLocal再从Map中获取一遍,而这就提升了性能。
firstReader和firstReaderHoldCount的作用与cachedHoldCounter相同,只不过firstReader和firstReaderHoldCount记录的是第一个获取读锁的节点的信息,作用也是相同的,就是缓存线程的count信息。
cachedHoldCounter是最后一个获取读锁的线程的HoldCount信息,而firstReader和firstReaderHoldCount是第一个获得读锁的线程的信息。也即Sync其实对这两个节点做了一些特殊处理,用一块内存单独缓存了这两个节点的HoldCount信息,提升了性能。
但是需要注意的是cachedHoldCounter和firstReader、firstReaderHoldCount是会变化的,比如当前获得锁的线程不是上一个线程,cachedHoldCounter就得更新为这个线程的信息。如果获得锁的第一个线程释放了锁,firstReader、firstReaderHoldCount也要跟着更新。
了解了上面这些信息,我们再来看之前的那段if代码:
//会走到这代表获取读锁成功,既然获取读锁成功就得更新自己线程的count值
//r==0代表当前没有在读的线程,那就代表当前线程是第一个获得读锁的线程
if (r == 0) {
//因此将当前线程赋给firstReader并将firstReaderHoldCount置为1
firstReader = current;
firstReaderHoldCount = 1;
// 否则 r!=0 代表当前有线程正在读,如果自己是firstReader,也即走到这代表第一个获得读锁线程的重入
} else if (firstReader == current) {
//那直接更新firstReaderHoldCount值即可
firstReaderHoldCount++;
//否则 走到这代表 r!=0并且自己也不是第一个获得读锁的线程
} else {
HoldCounter rh = cachedHoldCounter;
//那就判断下自己是不是最后一个获得读锁的线程呢?也即cachedHoldCounter内缓存的count是不是自己线程的
//如果不是的话,就通过readHolds从ThreadLocal的Map中取自己的HoldCount
//取完后要重新将cachedHoldCounter设为自己的HoldCount,因为自己现在是最后一个获得锁的线程了。
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
//更新当前线程的count值
rh.count++;
}
我们再来看下读锁下公平锁和非公平锁的区别:
同样,公平锁和非公平锁的不同也就是readShouldBlock的实现:
//公平锁会执行hasQueuedPredecessors,如果当前节点是真实头结点就不需要挂起,否则就得挂起
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
//非公平锁会调用AQS的apparentlyFirstQueuedIsExclusive
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
公平锁的实现比较简单,与写锁的实现一致,都是走hasQueuedPredecessors()
函数,判断当前线程节点是头节点后才允许获取锁。非公平锁会相对复杂,在理解非公平锁代码前我们先说下如果只是像写锁的非公平锁一样返回false会发生什么:
之前我们讲过读写锁虽然表面上是两个锁,但实际上是一个锁,锁的状态位state被拆成了高低16位使用。但是锁内部的CLH队列可没有拆过,也即因为获得不了读锁或获得不了写锁的线程可都在同一个CLH队列里面。
假设现在有这样一种情况:
当前读写锁中,线程1和线程2都获得了读锁在进行读操作,此时线程3想获得写锁,由于读写锁不支持锁升级,那么申请写锁的线程就会进入CLH队列挂起等待。此时线程4想要申请读锁,那要如何处理呢?该不该允许线程4获得读锁然后执行读操作呢?
如果readerShouldBlock()
直接返回false,那么代表线程4可以获得读锁,因此现在就变成了线程1,2,4都获得了读锁执行读操作,线程3由于读锁未完全释放因此还在CLH队列内等待。但是如果线程4之后又来了n多个线程都获取读锁,那么他们都可以直接获得锁,这样线程3还是无法满足获得写锁的条件,也即后来的读锁插队到了之前的写锁前面,优先获得了读锁,这会造成锁饥饿。
因此为避免上述的情况,非公平锁读锁的获取必须要先判断当前CLH队列内是否有立马可获得写锁的线程。注意是立马可获得写锁的线程,不是CLH队列内是否有要获得写锁的线程,什么是立马可获得,就是当前头节点的继任节点,也即次头节点(看来锁的设计者还是考虑了性能问题,在锁饥饿与性能之间做了一个中间方案)。因此代码
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
其实就是在判断当前头节点的继任节点是不是一个获取写锁的线程,如果是就让步,让人家先得到锁,如果不是,那不好意思,我先获得读锁。
下面我们再来谈下fullTryAcquireShared()
函数,希望你还没有忘,如果忘了可以看下tryAcquireShared()
的最后一行。
在分析fullTryAcquireShared()
前我们先分析下什么情况下会执行fullTryAcquireShared()
:当前线程具有获得读锁的权限(没有写线程或者写线程就是自己线程),但是获得读锁失败了,失败的原因通过
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT))
可以分析出原因有三:
- 执行readerShouldBlock返回true,也即当前读线程需要挂起不能获得读锁,readerShouldBlock的实现我们在公平锁和非公平锁已经讲到了
- 当前读锁的计数值已经超过65535了
- 执行CAS操作修改读锁的计数失败
这时我们再来看下fullTryAcquireShared()
的源码:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
//for 循环不断尝试
for (;;) {
int c = getState();
//如果有写线程
if (exclusiveCount(c) != 0) {
//并且写线程不是自己线程,那就返回失败,不能获得锁
if (getExclusiveOwnerThread() != current)
return -1;
//else if分支的理解见下面的注1(因为要写的太多,这里写不下)
//建议先看注1再看下面的代码
} else if (readerShouldBlock()) {
//如果可重入的是第一个获得读锁的线程,那重入时可以继续获得读锁,执行下面30行的CAS操作
if (firstReader == current) {
} else {
//下面的逻辑就比较简单,就是尝试更新cachedHoldCounter的值
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
//如果count==0其实就是代表当前线程没有被readHolds记录过,也就代表当前线程之前没有获得过锁
//也就是说,当前线程不是重入线程,我们在注1讲了,else if分支只处理重入的线程
//因此这种情况下的线程没有记录的必要,从readHolds中删除
if (rh.count == 0)
readHolds.remove();
}
}
//同理,count==0代表不是可重入线程,既然不是可重入又shouldBlock,那么就乖乖排队吧
if (rh.count == 0)
return -1;
}
}
//对上面else if分支的补充,其实上面else if分支就是为了确保读锁重入操作能成功,而不是被塞到阻塞队列中等待
//处理获得读锁的数量溢出情况
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//能走到这代表读锁数量没有溢出并且当前线程是可重入线程,那么下面就是对可重入线程对锁的获取
if (compareAndSetState(c, c + SHARED_UNIT)) {
//走到这代表CAS操作成功,也即代表获取锁成功
//下面就是修改线程的HoldCount中的count值,内容与tryAcquireShared相同,不再赘述
//这里主要提醒的一点是,如果CAS不成功,因为fullTryAcquireShared是for(;;)的形式,因此会不断的尝试
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
//获得锁后返回1
return 1;
}
}
}
注1:
else if (readerShouldBlock()) {
//...
}
会走到上面那里代表没有写线程 exclusiveCount(c) == 0
,但如果此时readerShouldBlock()
返回true则代表当前获取读锁的线程应该阻塞。既然没有写线程那么非公平锁下readerShouldBlock()
一定返回的是false,因此要想readerShouldBlock()
返回true,必然是公平锁返回的结果。公平锁会返回true,因为当前CLH队列内有在排队的线程,当前线程不是CLH的头节点。但是这样就会有个问题:如果我现在想获得读锁的线程是一个重入线程,就是我之前获得过读锁了,我现在还想获得读锁,你不能因为我不是CLH队列的头节点就不让我获得锁,因此上述else if
分支核心就是处理不是CLH队列的头节点但是之前获得过读锁,现在是可重入获取的情况。
3.7.4 readLock.unlock()
//读锁的释放,会调用AQS的releaseShared
public void unlock() {
sync.releaseShared(1);
}
//AQS中的共享锁释放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//锁释放的逻辑比较简单,就是先修改线程对应的HoldCount的count值,再修改锁状态位state的值
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果当前线程是第一个获得读锁的线程,就使用缓存的firstReaderHoldCount来修改count值
if (firstReader == current) {
//如果firstReaderHoldCount等于1,再减1就是0了,也就代表当前线程释放了读锁,那么就释放firstReader
if (firstReaderHoldCount == 1)
firstReader = null;
else
//否则直接-1就可以了
firstReaderHoldCount--;
} else {
//这里代表当前线程不是第一个获得锁的线程,此时就需要判断cachedHoldCounter有没有命中
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
//到这代表cachedHoldCounter没有命中,此时就需要从readHolds中取HoldCount
rh = readHolds.get();
int count = rh.count;
//这里是对count的校验,如果count<=1就代表当前线程不再持有锁,那么就从readHolds将当前线程对应的HoldCounter删除
if (count <= 1) {
readHolds.remove();
//如果count小于等于0,则代表释放锁的线程其实并不持有锁,这时就属于一个违法的锁释放,此时需要抛出异常
if (count <= 0)
throw unmatchedUnlockException();
}
//正常情况下对HoldCount做--操作
--rh.count;
}
//CAS操作,修改锁的状态位的值
//加for(;;)避免CAS失败,多次重试
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
doReleaseShared()
内容在CountDownLatch相同。