JUC源码详细分析之Part3——《常见的线程安全集合》
本系列笔记笔者将会详细的讲解JUC中核心类的源码,其中基于的JDK版本为
1.8.0_291
。由于JUC内容很多,因此笔者将笔记拆为四部分:另外,本系列笔记部分内容参考自《深入浅出 Java Concurrency》
4. 常见的线程安全集合
本章主要讲JUC下部分线程安全的数据结构,我们在日常开发中会经常使用他们,但是关于他们的实现原理与源码很多同学可能没了解过,现在就带大家了解JUC下常用数据结构的源码。
4.1 ConcurrentHashMap
我们之前利用读写锁实现了一个线程安全的HashMap,但是这个Map的性能并不高,因为首先他只支持同一时间一个线程的写入,另外读取的时候也需要加锁读取,在写的时候不允许读取,我们希望一个效率更高的map来替换之前读写锁版本的map,至少他需要改进两个功能:
- 可以支持一定量的并发修改
- 读取的时候不需要加锁
在理解ConcurrentHashMap前,我们默认读者对HashMap的实现是有一定了解的,有HashMap源码的经验,包括JDK1.7和JDK1.8版本不同的实现方式。
同样,ConcurrentHashMap的实现在JDK1.7和JDK1.8版本中也有所不同,我们先来说JDK1.7版本中ConcurrentHashMap的实现。
4.1.1 JDK1.7 ConcurrentHashMap
在JDK1.7中,ConcurrentHashMap的实现采用了锁分段的思想(JDK1.8也是,不过锁的颗粒度不同)。首先我们知道JDK1.7中HashMap是一个数组加链表的结构,假设我们将数组进行拆分,拆成n份,那么其实每一份是一个更小的数组,或者说每一份是一个更小的HashMap。我们将每一份小的HashMap称为段(Segment)。那么如果我们在操作的时候,先定位到使用哪个段,然后再在段内进行线程安全的增删改查。也即,每个段都是彼此并行的,段与段没有交集,这样最多可支持并发为n的写入/删除操作(n为段的数量)。
如:当前Map数组长度2n,将Map分为n个段,那么每个段的数组长度就是2,每个段都可以认为是个小的HashMap,段与段之间并行,段内的元素操作都是线程安全的。
JDK1.7中ConcurrentHashMap的实现,其结构如下:
首先ConcurrentHashMap内包含一个segments,即段数组,它包含16个段,每个段都是一个类HashMap结构,这个结构(Segment)持有一个独占锁,保证在对段进行修改操作时是线程安全的。另外哈希桶的实现(HashEntry)的实现是链表的方式,即相同hash值的节点通过链表串起来(与JDK1.7相同)。
在这里我们引入了段,为了提高map修改时的并发能力,但同样也带来了一个问题:当我们要操作一个元素时,如何定位这个元素在哪个段上,即原来我们只需要定位哈希桶,然后遍历链表节点,现在需要变为段->哈希桶->节点。JDK1.7中的实现依然是通过hash,即先通过hash定位到段,再通过二次hash定位到哈希桶(hashEntry),然后再通过遍历链表得到具体的节点。
了解了上面这些实现,我们可以从另一个角度看一下ConcurrentHashMap。首先ConcurrentHashMap内部有一个HashMap,这个HashMap的key是段号,value值是Segment。同时Segment也是一个HashMap,因此我们可以认为ConcurrentHashMap是一个value为HashMap的HashMap,这样也就不难理解进行元素操作时需要二次hash了。
我们先来看下JDK1.7中ConcurrentHashMap是如何通过key获得段的:
首先会将key值的hashCode(每一个Java Object都有hashCode)通过hash()
进行二次编码(哈希算法打散均匀),然后将得到打散后的hash值通过segmentFor()
函数,就可以定位到key对应的segments数组对应的数组下角标,返回数组下角标出处的Segment即可。熟悉HashMap的同学不难发现这与HashMap获得哈希桶的方式一模一样。
与HashMap中的哈希桶数组不同,segments不支持resize操作,也即segments的大小一旦初始化无法修改。
了解了这个后我们再来看下JDK1.7下的ConcurrentHashMap的增删改查:
get
我们之前已经讲了如何获得Segment,这里的get操作是获得Segment后的get,也即这里的
get()
是Segment对象的函数,而非CouncrrentHashMap的函数,下面讲的put()
、remove()
同理,不再赘述。这里唯一比较特殊的是
readValueUnderLock(e)
处的代码,在读到value为空时,还要加锁再读一遍,这是为什么呢?这与value的修改对当前线程还不可见有关,我们会在下面的put操作时详细解释。put
可以看到put整个操作都是加锁的,因此针对一个段的修改是线程安全的,这里我们就需要讲下在
get()
操作时留下的那个问题:在读到value为空时,还要加锁再读一遍,这是为什么呢?这与
tab[index] = new HashEntry<K,V>(key, hash, first, value);
这段代码有关。这段代码做的是将当前元素构造为HashEntry并设为整个链表的头结点,然后将这个新的链表加入到数组中。这里会存在一个重排序的问题:理论上,我们是先new了这个HashEntry,然后将数据赋给HashEntry对象,再将这个HashEntry赋给tab[index],即new HashEntry() -> 给 HashEntry赋值 ->设置进tab[index],但编译器有可能会对这个顺序进行重排,重排后顺序为:new HashEntry() ->设置进tab[index]-> 给 HashEntry赋值 。这样就会存在一个问题:如果我们将HashEntry设置进tab[index]后,设置了HashEntry的key但还没设置value时,此时进来了查询语句,想象会发生什么?查询会查到这个key值对应的value是null,因为我们还没来得及赋值。因此在get操作时,如果读到的value是null,执行了
readValueUnderLock()
函数,这个函数会加锁,如果此时我们正在写入,那么这个函数会等到写入完后再重新读取一遍value。remove
删除操作需要提及的是
这段代码,对于为什么if我们在注释中已经解释了,这里需要解释的是:如何从链表中删除一个节点?ConcurrentHashMap会先将当前节点的继任节点取出作为新链表的头结点,假设这样形成的链表我们命名为C,然后从原来的头结点开始遍历,每遍历一个节点都将这个节点设为链表C的头结点,直到遍历到当前节点为止,这样得到新的链表赋给原来的tab[index]即可。流程大概如下图:
假设我们要从链表B1->B2->B3->B4->B5 中删除删除B3,那么先将B4设为新链表的头结点,即B4->B5,然后从原来的头结点开始遍历,即B1,每遍历一个节点都将这个节点设为新链表的头结点,此时新链表就是B1->B4->B5。 然后是 B2->B1->B4->B5,此时新的链表形成,将这个新的链表赋给table[index]。
这里大家可能会有几个问题:
为什么要这么做?
在其他链表结构中,如果删除链表中的节点,很可能是B3->pre->next = B3->next即可。但ConcurrentHashMap内的元素节点没有pre指针,另外他们的next指针都是final的,这样,链表结构如果变动只能重新生成节点。
如果我们在删除节点的时候发生了读事件,读到的会是什么情况?
如果新的链表还没设置进table,那么读取时会用旧的链表来读取,自然读到的是旧数据。也即旧的链表在新的链表生成过程中是依然存在的,这样做可以保证在删除时读操作依旧能读到数据,只不过可能不是最新的。旧的链表会在我们将新链表设置进table[index]后失去引用,然后被GC回收。
这样我们就完成了ConcurrentHashMap的增删改查操作,这里还有一些需要注意的细节:
当我们对一个段进行操作时,对段的修改操作由于加了锁因此保证同时只能有一个线程在修改段,但读取是没有加锁的,因此可以有多个线程同时读取段。为了让修改线程修改的内容能被读取线程看到,一些字段就需要设置为volatile,如Segment中的count,table属性,HashEntry中的value属性。在对volatile字段进行更新时,代码基本上都是先建立一个临时变量指向volatile变量,然后读写操作就在这个临时变量中进行,最后再将这个临时变量赋值给volatile变量。
如:
这样做的原因是多次读取volatile类型的开销要比非volatile开销要大,而且编译器也无法优化,因此建立临时变量,这样多次读写tab的效率要比volatile类型的table要高,JVM也能够对此进行优化。
从这里我们也看出JDK源码编写者们对代码编写的严苛程度。
4.1.2 JDK1.8 ConcurrentHashMap
JDK1.8中也采用分段锁的思想,不过JDK1.8取消了段的概念,加锁的颗粒度设为了每一个哈希桶(也可以理解为HashMap数组中的每一个元素都是独立的加锁),另外JDK1.8中采用synchronized和CAS乐观锁来代替JDK1.7中的独占锁保证线程安全性。当然与HashMap一致的是JDK1.8中ConcurrentHashMap也引入红黑树。
JDK1.8中ConcurrentHashMap的实现极其复杂,其源码有6000多行,内部类有50多个,本篇笔记也只是对JDK1.8的ConcurrentHashMap浅析,主要讲一下常用API的源码:
put
上述便是put流程的主要代码,我们可以做个流程总结:
- 如果key或value为null,抛出异常,否则执行步骤2
- 如果数组为空,则初始化数组,然后执行步骤2,否则执行步骤3
- 如果key所对应的Node不等于空,执行步骤4。否则创建一个Node并插入到哈希数组中,如果插入成功执行步骤7,如果插入失败则执行步骤2。
- 如果哈希正在扩容则帮助扩容迁移,然后执行步骤2,否则执行步骤5
- 如果Node不是头节点了,执行步骤2。否则如果Node是链表将节点插入到链表尾,如果Node是红黑树则将节点插入到红黑树中,然后执行步骤6
- 如果刚才插入到的是链表尾部且链表长度大于等于8则执行扩容或链表转红黑树操作,然后执行步骤7
- 更新记录并判断是否需要扩容,如果需要就扩容
上面有几点比较重要的我们需要细说:数组初始化,扩容,帮助数据迁移
数组初始化
链表转红黑树时会发生扩容的可能
4.2 ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个链表实现的线程安全的队列,队列内元素的顺序就是入队列的顺序,队列头是队列内时间最久的,而队列尾就是队列中时间最短的。
通过我们前面对锁的学习,可以设想下实现一个线程安全的队列有几种方法:
- 使用synchronized来同步队列
- 使用Lock,用读写锁或独占锁。
- 使用乐观锁CAS和不断循环的方式。
ConcurrentLinkedQueue是通过第三种方案CAS来实现线程安全的。
ConcurrentLinkedQueue内部的属性比较简单,主要的属性只有两个,节点头和节点尾
其中Node类结构更加简单,只有value和next指针,这里可以看到ConcurrentLinkedQueue是一个单向链表。
正常的增加和删除节点只能操作头结点和尾节点,如果想操作中间节点必须要遍历这个队列。
这里所有的属性都是volatile类型,因为ConcurrentLinkedQueue是非阻塞的,需要保证写入可以被后续的读取看到,同时避免重排序。
队列的头节点是无意义节点也即伪节点,当类初始化的时候,会初始化一个无意义节点赋值给head和tail指针,但当插入一条数据后会修改tail指针的值,将tai指针变得有意义。也即head指针一直都是无意义的,当队列内有数据时tail指针就是真实的最后一个数据,但当队列内无数据,tail=head=伪节点。
队列的插入:
todo:看不懂里面的逻辑
出队列:
遍历数组的大小
4.3 阻塞队列 BlockingQueue
阻塞队列就是会对元素的入队和出队操作在不满足条件时进行阻塞。比如如果队列在无元素时执行出队列操作,那么会一直阻塞到队列内有可出队列的元素为止。
阻塞队列有两个方法 put
和take
即阻塞的入队和出队操作。阻塞队列的使用场景非常普遍,最常见的就是生产消费模型,生产者生产信息往队列中put,当队列满了就需要阻塞,消费者消费数据就是从队列中take,如果队列内无可取的元素就阻塞直到有元素。
在JUC中对于阻塞队列有几种不同的实现,下面我们一一看下。
4.3.1 LinkedBlockingQueue
LinkedBlockingQueue内部属性如下:
Node节点的信息如下
LinkedBlockingQueue内部维持了一个线程安全的count,这不像ConcurrentLinkedQueue需要遍历整个队列得到size。另外LinkedBlockingQueue内部维护了两个锁,一个是take时的锁一个是put时的锁。在ConcurrentLinkedQueue中我们知道,其实出入队列基本是相互分离不干扰的,出队列就是操作head,而入队列是操作tail,因此我们可以认为take和put操作是互不干扰的,也即可以并行执行的。通过两个锁保证入队和出队不互相影响,提高了吞吐率。
入队列:
这里需要考虑的一点是为什么
有这样一段代码。为什么我们在入队的时候还需要唤醒其他入队挂起的线程,正常来说入队的线程不是应该由出队线程唤醒吗?
这是因为出队线程出队成功后,判断如果之前队列是满的,确实会执行唤醒操作,但执行的是signal()
而非signalAll()
(一会我们会看到)signal()
存在一个问题是可能不一定成功唤醒这个线程,如果没有成功唤醒,那么这些线程还在挂起,此时入队的线程如果发现可以继续入队,那么他也会尝试唤醒,这样避免了因为出队线程signal()
不成功而入队线程挂起过久的情况。
我们来看下signal()
为什么可能唤醒不成功?
假设我们现在有两个线程A、B正在被条件变量挂起,但他们的条件不同,如:
此时线程C执行signal()
操作,JVM会随机选择一个挂起的线程唤醒,假设此时选择了B线程唤醒,但因为x是7,所以B在唤醒后会继续await()
挂起,但此时线程A因为没接收到通知,还在挂起等待唤醒,即使A应该醒来了。
还一种情况是,我们唤醒了生产者B线程,但此时B抛出了异常,那么按理挂起的A可以接着生产,但因为没有被通知,还是会被挂起。
因此这里多一个signal()操作可以避免线程唤醒失败导致其他挂起线程等待过久的情况。另外为什么选择用signal()
而不用signalAll()
?如果使用signalAll()
全都通知到,似乎就不存在上面说的问题了。
假设此时队列是满的,因为队列满挂起了10000个生产者线程,然后此时消费者消费了一个元素,执行signalAll()
操作,想象一下会发生什么?10000个线程都会醒来,为了这一个可生产的位置不断的尝试抢占锁,但最多只有一个线程能获得锁,其他线程又会被再次挂起,这样就会带来大量的上下文切换消耗,这一现象有个比较专业的名词叫惊群
出队列:
这里比较有意思的一点是在删除的时候,将真实的头结点变为新的假头结点,而不像更简单的:fakeNode->next = fakeNode->next->next
这种操作,这种操作看起来更简单,一直保留这个假节点,删除的时候将真实节点从中删掉就好了,这样更方便不是吗?
想象这样一种情况,当前队列内只有一个元素,那么我们队列的结构就会如下:
如果先执行take再执行put操作时,take操作按照fakeNode->next = fakeNode->next->next
,那么就变成
take操作时,因为队列内只有一个元素,那么take之后fakeFirst->next = null
,此时队列情况为:
当再执行put操作时: last.next = node;
。但此时last指针的指向是NULL,last.next会报NullPointerException
。
因此,这里删除选择的是替换新的伪节点的方法。这样在队列内没有元素时,last->next = newNode
,这个last指针和当前first指向同一个节点,那么也就代表fakeFirst->next = newNode
(保证在队列为空时,head和last都指向fake节点)。
4.3.2 ArrayBlockingQueue
实现队列的方式无非两种:链表和数组。上面我们讲的是链表实现了一个阻塞队列,这里我们讲如何通过数组实现一个阻塞队列。
在讲数组实现阻塞队列前我们先讲下数组如何实现一个FIFO队列:
- 首先我们需要两个指针,分别是header和tail指针,由于数据结构是数组,因此我们的指针只需要是数组索引(下角标)即可。
- 当出队列一个元素时,header指针向后移动(索引+1),并返回旧header索引处的值,然后将旧heaer索引处的值设为null
- 当入队列一个元素时,tail指针向后移动(索引+1),将新元素添加到tail指针指向的新位置。
- 如果header或tail移动到了数组的尾巴,再移动时就从索引0开始,这样就形成了一个环形数组。
- 如果数组满了,那么就拒绝入队或扩容数组。
其代码如下:
有了这个基础,我们再来看ArrayBlockingQueue的实现,其属性如下:
入队:
出队:
4.3.3 PriorityBlockingQueue
JDK中有一个数据结构叫PriorityQueue,即优先队列,这是一个有序的队列(部分有序),使用者在创建的时候需要指明排序方式(或元素自身实现了Comparable)。每次入队的时候,都会排序将元素插入到正确的位置,出队的时候出的是队列内的第一个元素(即排序后最小的元素)。理解PriorityBlockingQueue前我们先实现PriorityQueue,这会使我们更容易理解PriorityBlockingQueue的源码。
PriorityQueue本质是一个完全二叉树(二叉堆),且树上的任意一个节点都小于其子节点,因此这样保证了每次出队的时候都是出树顶的数据。PriorityQueue是通过数组实现的,我们可以对二叉树节点进行编号,从上到下,从左到右,那么每一个节点的编号其实就是其所在数组的索引,这样我们就能通过数组索引来访问对应的节点。数组第0个元素就是树顶,其子节点是第1个和第2个元素,第1个元素的子节点是第3和第4个元素,以此类推,第n个元素的子节点是 2n+1和2(n+1)。同样,第n个节点其父节点是(n-1)/2。
有了这些概念,我们来看下PriorityQueue是如何入队的:
当一个元素入队时,我们会先假装他现在在队列的末尾,然后比较当前节点和父节点的大小,如果小于父节点就将自己和父节点交换,重复此过程一直到当前节点不小于父节点或当前节点是头结点为止,我们将这一流程称为“上浮”。即新节点会不断和当前父节点进行比较,如果小于父节点就会上浮。
再来看PriorityQueue是如何出队的:
我们知道出队的必然是树顶也即数组0位置的元素,但树顶出队后就需要选拔出新的树顶来占据数组0这个位置,这样又会空出来一个位置,依次类推,会不断的变换树的形状。但无论怎么变,出队都是做了-1操作,即假设当前树节点共n+1个,出队后树节点还剩n个,数组内的很多元素都会移动,最终数组0位置的元素会被新的填充,数组n位置的元素会被清除。这样一看,其实出队就是我们从当前数组中选拔出新的位置0的元素,并且将数组n位置元素安排到恰当的位置。因此出队的逻辑如下:
我们先取出数组n位置的元素,命为n,将当前出队的树顶元素得到,命为x。得到x的两个子节点,如果两个子节点都大于n,那么就将n与当前x交换,出队完成。否则如果这两个子节点有比n小的,就将x与较小的那个子节点交换,重复获得当前x的子节点,再次比较,如果一直交换到x不再具有子节点(最后一层),就将x于n交换。这样在树顶元素x与其子节点不断交换的过程中,我们可以成为“下沉”。
其入队和出队的代码如下:
有了这个基础后我们再来看JDK中PriorityBlockingQueue的实现:
PriorityBlockingQueue类在PriorityQueue中多加了一个独占锁和一个信号量,独占锁是为了保证线程安全的,而信号量是保证在队列为空时出队进行阻塞。另外因为PriorityBlockingQueue是无界集合,即会自动进行扩容,所以不存在入队时的挂起,只会有出队时的挂起。
入队:
出队:
阻塞出队:
4.4 CopyOnWriteArrayList
CopyOnWriteArrayList的源码是很简单的,但是要看懂源码需要先了解CopyOnWrite思想,这一思想及其的重要,其使用场景也非常广泛,如OS下fork一个子进程,Redis的RDB,Kafka的客户端内存缓冲区。
在说COW前我们先说下如何实现一个线程安全的List集合,比较简单的方法就是加锁,如果想效率比较高可以加读写锁。但如果是读多写少的场景,我们加读写锁会有什么影响呢?首先我们知道读写锁的写写操作和读写操作是互斥的,当某个线程在写入的时候,不允许其他线程读取。这样假设我们在高并发读取的情况下偶尔插入一条写请求,那么其他读线程可能都会阻塞,这肯定不满足我们的需求的。我们更想要的是一种在读取的时候能够不阻塞,并且在写入的情况下也不会影响读取的List。
这就引入了COW,COW就是写时复制,它被广泛的应用于读多写少的场景。在写入的时候会拷贝一份原始数据的副本,然后在副本数据上执行修改(写入)操作,此时读取操作还是读的原始数据,并不会因为写入受影响,因此也不会阻塞读取。修改完成后再将原始数据替换为副本数据。然后Java下会通过volatile关键字通知数据被修改,保证读线程对修改的可见性。
但是需要注意的是,CopyOnWriteArrayList只适合读多写少的场景,因为在写入的时候,写写线程也是互斥的,同时每个写线程还会执行一次数据复制操作,因此更耗时。
下面我们看下CopyOnWriteArrayList的源码实现
首先CopyOnWriteArrayList的属性很少,关键的只有如下两个:
add 操作
remove操作
get操作