服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - JAVA核心知识之ConcurrentHashMap源码分析

JAVA核心知识之ConcurrentHashMap源码分析

2021-09-02 15:24yue_hu Java教程

这篇文章主要介绍了JAVA核心知识之ConcurrentHashMap源码分析,想了解ConcurrentHashMap的同学一定要看啊

1 前言

ConcurrentHashMap是基于Hash表的Map接口实现,键与值均不允许为NULL,他是一个线程安全的Map。同时他也是一个无序的Map,不同时间进行遍历可能会得到不同的顺序。在JDK1.8之前,ConcurrentHashMap使用分段锁以在保证线程安全的同时获得更大的效率。JDK1.8开始舍弃了分段锁,使用自旋+CAS+sync关键字来实现同步。本文所述便是基于JDK1.8。
ConcurrentHashMap与HashMap有共同之处,一些HashMap的基本概念与实现,本文不再赘述。

2 继承关系

?
1
2
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable

可以看到ConcurrentHashMap继承了AbstractMap及ConcurrentMap抽象类,并实现了Serializable接口,这说明ConcurrentHashMap是一个线程安全的标准Map,且允许序列化。与HashMap不同是ConcurrentHashMap不允许Clone。

3 构造方法

ConcurrentHashMap同样是采用懒初始化的方式,有实际元素时才进行容器的初始化。因此其构造方法与HashMap相差无几。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 无参构造
public ConcurrentHashMap() {
}
// 带有初始容量的构造
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); //求最小二次幂
    this.sizeCtl = cap; // sizeCtl 暂存容量
}
// 带有初始集合的构造
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}
// 指定初始容量和装载因子的构造
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

以上构造并没有什么特别的,逻辑也相对简单,不再详细解析,感兴趣的话可以到前言提到的HashMap篇了解。
除此之外,ConcurrentHashMap拥有另外一个值得注意的构造方法:
指定初始容量,装载因子以及并发级别的构造方法:
源码:

?
1
2
3
4
5
6
7
8
9
10
11
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 基础校验
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // 取初始容量和并发级别的最大值
        initialCapacity = concurrencyLevel; 
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);  // 指定了装载因子,因此就用初始容量和装载因子计算出实际要的初始容量
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size); // 确定计算出初始容量,
    this.sizeCtl = cap;
}

解析: 除了初始容量与装载因子外,此构造方法还有一个并发级别concurrencyLevel的参数。在jdk1.7时,并发级别作为分段锁的标准进行分段。但是jdk1.8开始舍弃了分段锁,为了版本兼容,此构造方法依然存在,但是concurrencyLevel也不再具有其分段依据的意义,而是作为初始容量的定义依据。

4 初始化

ConcurrentHashMap采用懒初始化的方式,在第一次putAvl时如果容器为空,则会调用initTable()进行容器的初始化:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {    // tab为空时就一直循环
        if ((sc = sizeCtl) < 0)       // sc小于0说明正在初始化或者正在复制,放弃CPU等下一次
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    // 否则的话就用CAS把sizectl设置成-1,标识正在初始化,且成功的话
            try {
            // CAS之后再判断一下,这是为了避免并发,比如上面判断了tab为空,然后另一个线程做了初始化操作,结束时sc被设置为扩容阈值,然后继续(sc = sizeCtl) < 0,这时cs还是大于0,所以还是能走进来的,所以这里再判断一下
                if ((tab = table) == null || tab.length == 0) {  
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;     // sc最初始是存在的初始容量的
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];   // 建立一个容器
                    table = tab = nt; // 然后复制到table属性上
                    sc = n - (n >>> 2);   // sc等于 n>>>2就是n/4就是0.25n   sc = 0.75n,相当于设置扩容阈值
                }
            } finally {
                sizeCtl = sc; // sizeCtl赋值成扩容阈值
            }
            break;    // 只要走到这里面,那么说明一定已经初始化结束了,无论初始化是自己做的还是其他线程做的,然后就跳出
        }
    }
    return tab;   // 返回新的tab
}

解析: 容器初始化完成之前不断的进行循环。这是因为ConcurrentHashMap是一个支持并发的Map,可能同时会有多个线程进入initTable方法,但是只有一个线程执行初始化操作,那么剩下的线程就需要等待初始化完成再跳出initTable方法,以满足接下来的putVal操作。
为了保证只有一个线程执行初始化操作,使用sizeCtl来作为标识,sizeCtl为-1时即说明当前已有线程正在初始化,则放弃CPU继续循环。sizeCtl不为负数时,则使用CAS(通过Unsafe+偏移量的方式实现)将sizeCtl置为-1,如果当前线程成功的话则进入实际的扩容操作。
通过CAS锁定成功后再次判断容器是否为空,这是为了避免并发,比如上面判断了tab为空,然后另一个线程做了初始化操作,结束时sc被设置为扩容阈值,然后继续(sc = sizeCtl) < 0,这时cs还是大于0,所以还是能走进来的,所以这里再判断一下。
如果sc(sizeCtl原值)大于0,则以sc做为初始容量。这个在构造方法篇提到过,对于有初始容量要求的构造,会以sizeCtl暂存初始容量。否则的话就取默认的初始容量DEFAULT_CAPACITY(16)
接下来就建立目标容量的容器并赋值给容器属性table,计算该容量下的扩容阈值赋值给sizeCtlsizeCtl的赋值放到finally里面的原因是因为无论容器创建成功还是失败,都需要放开以sizeCtl为负值作为判断条件的锁,以保证在本线程创建失败的情况其它线程能继续竞争锁继续进行容器创建的工作。
至此,容器的初始化便完成了。

5 新数据载入-putVal

ConcurrentHashMap新数据载入主要通过putVal实现:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { // 会一直循环,直到break
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();  // 初始化table
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    // unsafe获取i处的数据,如果为null
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))    // cas操作,如果i处为null就构造新元素,成功就break  失败就继续循环
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)        // f处的元素hash是-1   -1意味着这是一个迁移节点
            tab = helpTransfer(tab, f);   // 辅助迁移开始
        else {
            V oldVal = null;
            synchronized (f) {    // 用f进行同步
                if (tabAt(tab, i) == f) { // 这里要保证i处还是f,而不是已经被其它元素并发占据了
                    if (fh >= 0) {    // f处的hash要大于等于0,这是为什么呢?除了MOVED还会有其它负值吗
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {  // 从e往后找,并且记录元素数量
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) { // 一系列判断Key是否相等,如果相等
                                oldVal = e.val;   // 记录旧值
                                if (!onlyIfAbsent)    // 不是仅允许新增的话
                                    e.val = value;    // 记录新值
                                break;    // 成功的话就跳出大循环
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {   // e向后移动,  如果移动之后是nul,就是在末尾的话
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);   // 建立一个新节点
                                break;     // 成功的话就跳出大循环
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {  // 如果是一个树化节点
                        Node<K,V> p;
                        binCount = 2; // 树化的节点的话固定元素数量为2,这是一个相对特殊的值,即会扩容又不会重复树化
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) { // 往树里面新增元素,如果存在就返回那个节点,不存在就是新增节点
                            oldVal = p.val;   // 记录旧值
                            if (!onlyIfAbsent)    // 不是仅允许新增的话
                                p.val = value; // 记录新值
                        }
                    }
                }
            }
            if (binCount != 0) {  // 如果元素数不为0的话
                if (binCount >= TREEIFY_THRESHOLD)    // 判断是否到了树化阈值
                    treeifyBin(tab, i);   // 到了的话进行树化
                if (oldVal != null)   // 覆盖操作直接返回旧值不会执行addCount
                    return oldVal;
                break;     // binCount != 0才跳出大循环,为0是tabAt(tab, i) != f导致的,即i处已经被其他线程覆盖了
            }
        }
    }
    addCount(1L, binCount);   //
    return null;
}

解析: 完成为空校验后,通过spread方法来计算出hash以确定下标位置,spread的计算方式为(h ^ (h >>> 16)) & HASH_BITS,HASH_BITS的值为0x7fffffff,描述出来就是将hashCode的高16位和低16位做异或操作,并保证最高位符号位为0(结果是一个正数),注解表示这样做的原因是为了使数据更加分散,尽可能的避免hash冲突。
如果容器tab为空或者长度为0说明容器未初始化,那么就调用initTable进行容器初始化。
否则的话对hash与现容量进行与运算得出数据应处的下标,判断此下标所在处节点是否为空,为空说明没有元素,直接创建一个新节点作为头元素放进去,这一步通过CAS操作实现,避免并发情况下另外的线程先一步完成头节点创建操作。
如果下标所在处节点不为空,说明该处已经有元素了,此时判断头节点的hash是否为MOVEDMOVED的值为-1。上面提到过,正常元素通过spread方法计算出来的hash值都会使正数。此处-1为一个特殊值,意味着此节点正在进行扩容迁移工作,那么此时就调用8 辅助扩容-helpTransfer方法进行辅助迁移工作。
如果节点hash不为MOVED,意味着这是一个正常节点,就执行元素载入工作,使用synchronized关键字实现同步, 同步块内开始还要使用tabAt(tab, i) == f判断进入同步后i处的节点还是原节点,接下来就是元素的载入工作了,整体和HashMap的流程是相差无几的,感兴趣的话可以去文首提到的解析HashMap的文章了解一下。synchronized块将元素载入节点后,接着会对binCount进行判断,binCount在节点还处于链表模式下的情况下记录节点在新元素载入后的元素总数量,此处判断binCount达到TREEIFY_THRESHOLD(8) 树化阈值后,就会对该节点进行树化操作。树化操作依然是通过synchronized来完成的,这里不做过多延伸。
以上整个新元素载入操作都是一个自旋的过程,一直到新元素载入成功后通过break跳出自旋过程。
新元素载入节点后,需要对count实时维护,count通过 6 维护与启动扩容-addCount 方法完成同步维护工作。

6 维护与启动扩容-addCount

ConcurrentHashMap通过addCount方法实时维护内部元素数量,并在达到扩容阈值的情况下启动扩容操作:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s; 
    if ((as = counterCells) != null ||    // counterCells不为空,如果没经历过并发肯定是空的,这个判断意思是维护count的过程中已经经历过并发,已经处于不使用baseCount维护的阶段了
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {       //这个比较唯一会出现false的情况就是b = baseCount执行之后,还没进行cas时,baseCount又被改了。意思是如果没有并发就用baseCount维护,并发导致baseCount维护失败就进入counterCells维护
        CounterCell a; long v; int m;
        boolean uncontended = true;   // 记录是否遭遇了并发
        if (as == null || (m = as.length - 1) < 0 ||      // as等于null意味着baseCount维护遭遇并发了,且是初次(非初次counterCells不会为null)  (m = as.length - 1) < 0说明counterCells长度为0,即没有实际内容
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||     // counterCells是一个分段计算的概念,这里就是随机找一个槽,如果这个槽位空的话
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {      // 槽不为空的话就用cas把槽里面的值赋值给v+a.val    如果失败的话
            fullAddCount(x, uncontended); //  用fullAddCount  就是LongAdder的模式将x放进去,初始化counterCells
            return;
        }
        if (check <= 1)       // check小于等于1(1表示是当前节点的第一个元素,-1标识是replace方法或者clear过来的,这些都不需要扩容) 
            return;
        s = sumCount();   // 计算一下现在的总量
    }
    if (check >= 0) {  //baseCount成功的情况且 check大于0,即不是replace或者clear方法进来的
        Node<K,V>[] tab, nt; int n, sc;
        /**
        1.s >= (long)(sc = sizeCtl)  s是总数量  s大于等于扩容阈值sizeCtl时,或者sizeCtl为负数,正在扩容时
        2.(tab = table) != null 并且容器tab不为空,意思就是表已经初始化完毕了,是执行扩容而不是执行初始化
        3. (n = tab.length) < MAXIMUM_CAPACITY)  并且容量还没有达到最大容量时
         */
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);  // 获得容量特征值:容量n的左0数量并将高位置1
            if (sc < 0) { // 如果sc小于0  意味这个正在进行并发扩容
                /**
                sc >>> RESIZE_STAMP_SHIFT != rs是判断现在的容量与sc中记录的长度是不是不一样长,不一样长的话说明不是同一长度的扩容,就要break-保证是同等长度的扩容
                sc == rs + 1 这个地方在看来是一个bug,如果已经处于扩容中,sc目前处于高16位是容量特征值,低16位标识扩容线程量特征值的状态,而此时的rs是容量特征值的状态,也就是目前sc的高16位,这里原意应该是判断目前是不是只剩下一个非初始扩容线程
                             ,为什么这么说呢,因为初始扩容线程会把sc设置称高16位是容量特征值即rs,低16位标识扩容线程量特征值(初始扩容线程+2,非初始扩容线程+1),这一步判断的就是如过只剩下一个非初始扩容线程了,说明初始扩容线程已经结束,现在已经进入扩容的最后阶段了,就不需要再继续进行辅助扩容了
                            ,正确的方式应该是sc == (rs << RESIZE_STAMP_SHIFT) + 1
                sc == rs + MAX_RESIZERS  和上面+1一样,不过这个是判断参与resize的线程是否已经达到所允许的最大值 ,看起来也是一个bug,正确的应是sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
                (nt = nextTable) == null  nextTable为空说明已经扩容完毕了,不再需要辅助扩容了
                transferIndex <= 0   // 判断下边界是不是已经到0了,扩容是根据CPU算出步长,一个扩容线程扩容一段,从高到低分配,transferIndex记录的就行目前分配到哪个下标了,如果这个下标小于等于0说明整个容器已经分配完了,就不需要在辅助扩容了
                 */
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))       // 如果是辅助扩容的sc已经处于高16位是容量特征值,低16位标识扩容线程量特征值的状态了+1让地位的扩容线程量特征值再+1
                    transfer(tab, nt);    // 进行扩容
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2)) // 初始扩容现在是+2 执行完这一步sc的值高16位是容量特征值,低16位标识扩容线程量特征值,因为resizeStamp时将高位置为1是的其又是一个负数,表示正在扩容
                transfer(tab, null);
            s = sumCount(); // 计算总数
        } // 一直循环扩容
    }
}

解析: count的维护有两种方式:未产生并发场景时通过baseCount维护,经历过并发场景后转变为通过counterCells维护。baseCount模式为直接使用int进行加减运算,cas保证同步,counterCells模式类似于1.8之前的ConcurrentHashMap,采用一个分段的概念,运算时随机找一个槽,通过cas保证一个槽的值加算同步,count即为所以槽的加和(使用LongAdder实现)。
addCount方法初始即通过counterCells是否为空(为空说明未经历初始化,使用的baseCount模式)来判断当前是否为counterCells模式,如果处于counterCells模式则进入counterCells模式计算逻辑,否则的话使用baseCount模式来维护count并记录为s,如果遭遇并发导致维护失败,则转换为counterCells模式进入counterCells模式计算逻辑。
进入counterCells模式,首先看counterCells是否已完成初始化,若为初始化进入fullAddCount,已初始化的话则随机找到counterCells中的一个槽位,如果这个槽位未初始化则进入fullAddCount,如果已初始化则对这个操作进行CAS的加操作来维护count,CAS成功则维护完成,并发导致CAS维护失败则进入fullAddCountfullAddCount有点类似于CMS的full GC退化机制,会完成counterCells的初始化以及并发冲突场景下同步完成count维护。counterCells最后则判断check是否小于等于1(1表示是当前节点的第一个元素,-1标识是replace方法或者clear过来的,这些都不需要扩容)则直接结束,不进行扩容判定,否则的话通过sumCount获得当前count并记录为s以准备后续的扩容判定。
check大于等于0(即不是replace或者clear方法引起的count改变)时进行扩容判定,执行扩容需要满足三个条件:1.count大于扩容阈值或者当前正在执行扩容操作2.容器已初始化完毕3.容量未达到最大容量。满足以上条件则需判定当前是否正在扩容,因为扩容时会将SIZECTL设置为一个特征值,这个特征值为负值,因此通过sc是否小于0来判定当前是否处于扩容状态。接下来使用resizeStamp获得容量特征值(一个表示容量n的左0数量并将高位置1的值),如果当前未处于扩容过程,则通过容量特征值获得扩容特征值(高16位为容量特征值,低16位为扩容线程量特征值)并通过CAS修改SIZECTL为扩容特征值,成功则当前线程作为第一个扩容线程启动扩容,失败则重新计算count并进行扩容判定。
如果已处于扩容过程,则判断是否还要写辅助扩容线程,满足以下几个条件则:1.此次扩容的容量与当前正在进行的容量一致2.扩容线程未达到最大额定值3.不是处于扩容收尾阶段(只剩下一个辅助扩容线程,nextTable已经为空,扩容下标已为0无法继续分配)则使用CAS修改扩容特征值成功后当前线程作为辅助扩容线程加入扩容任务,不满足条件则重新计算count并自旋进行扩容判定。

7 进行扩容-transfer

ConcurrentHashMap的实际扩容操作通过transfer来完成:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
   // CPU大于1的话  步长就是0.25n/CPU数    否则就是n     算出来的步长小于最小步长的话  就按最小步长来
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating   // nextTab == null 说明是第一个来扩容的线程
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  // 建立一个二倍长度的数组,然后赋值给nextTab
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;      // 异常的话就是OOM?  唯一的异常也就是n过大?
            return;
        }
        nextTable = nextTab;  // 这个赋值为什么不做同步呢?不怕并发问题吗,哦  不会  因为调用这个方法的地方已经确定了,只会有一个初始线程过来
        transferIndex = n;    //   n就是新的长度  
    }
    int nextn = nextTab.length;   // 新容量
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);   // 创建一个中继节点
    boolean advance = true;       //一个终止的标记
    boolean finishing = false; // to ensure sweep before committing nextTab  // 是否已经处于收尾扫描提交阶段
    for (int i = 0, bound = 0;;) {    // 一直循环着按步长扩容
        Node<K,V> f; int fh;
        while (advance) {     //  计算出bound和i   赋值TRANSFERINDEX
            int nextIndex, nextBound;
            if (--i >= bound || finishing)        // i大于bound,说明什么呢,说明i已经在区间内了
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {      如果下一次的上边界已经到0了 也停止
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?           //nextBound作为新的上边界
                                   nextIndex - stride : 0))) {
                bound = nextBound;        //nextBound作为这次的下边界
                i = nextIndex - 1;    // 从nextIndex(上边界) - 1开始依次递减到bound区域
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {  // 如果i不在0-n范围内,已经越界
            int sc;
            if (finishing) {  // 如果已经在收尾阶段
                nextTable = null; // 重置nextTable
                table = nextTab;  // 容器确认
                sizeCtl = (n << 1) - (n >>> 1); // 2n-0.5n = 0.75*2n   就是新的扩容阈值
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {   // sc减1  意味当前扩容线程结束
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)     // 如果sc-2和容量特征值一样时,就说明现在是最后的一个扩容线程了
                    return;   // 直接返回
                finishing = advance = true;   // 不是最后的话   设置为收尾阶段?,确实,进入这里面已经越界了...
                i = n; // recheck before commit           // 提交前的重复检查?
            }
        }
        else if ((f = tabAt(tab, i)) == null)     // 如果i处还没有使用
            advance = casTabAt(tab, i, null, fwd);    // cas设置i处的新fwdnode,成功还是失败会记录在advance上面
        else if ((fh = f.hash) == MOVED)  // 如果这节点已经是一个中继节点   说明什么呢?说明已经有人处理过了上一步
            advance = true; // already processed
        else {    // 如果有值,并且不是中继节点的话
            synchronized (f) {    // 同步处理f,所以只会有一个线程作为参与者
                if (tabAt(tab, i) == f) { // 同步之后还要再判断一下  万一f被并发处理掉了呢
                    Node<K,V> ln, hn;
                    if (fh >= 0) {    // fh>=0说明是一个还没树化的链表?
                        int runBit = fh & n; 
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {    // 这是找最后一段的开头节点
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {    // 最后一段的开头节点是低位的话
                            ln = lastRun; // 开头节点赋值到ln   也就是说从lastRun开始,之后的都没有变化了,都是在同一位
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) { // 只需遍历到lastRun即可
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);   // 往前补
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);   // 往前补
                        }
                        setTabAt(nextTab, i, ln); // 赋值新容器
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);    // 旧容器节点变为中继节点,但是如果在这一步之前就有新元素来了怎么办,不会来,因为新增也是sync(f)的
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {  // 如果是一个树化节点
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :    // 退化或者重新树化
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

解析: transfer采用分段扩容的方式,从n-1->0,每个线程每次会占用一个步长的区间,然后针对这个区间进行扩容,扩容完毕再去占用下一个区间,直到无法占用新的区间则结束扩容流程。
步长的计算与应用的CPU数有关,如果当前应用CPU数为1则步长为n,即单线程库扩容。如果当前应用CPU数不为1,则为0.25n/CPU,但是如果算出的步长小于最小MIN_TRANSFER_STRIDE(16),则以MIN_TRANSFER_STRIDE为步长。
接下来通过nextTable 是否为空判断是否为初始扩容线程,如果是的话就创建一个长度为2n的新容器并记录到nextTable属性,所有扩容线程共同使用nextTable作为新容器,同时记录当前扩容进度下标transferIndex为n(扩容占用是从n到0的,因此初始未占用时就是n)。这里虽然没进行加锁但是依然不会有并发问题,因为nextTable为空的只有初始线程,而初始线程的创建则是线程安全的。
步长确认完毕,新容器创建完毕,扩容进度下标初始化完成,接下来就开始进行区间占用了:使用自旋+CAS来进行线程安全的区间占用,advance作为自旋结束的条件:如果i大于bound意味着本次占用的区间还没处理完则结束自旋。finishing意味着扩容已经进入尾声了,也无需再加入扩容,结束自旋。扩容进度下标transferIndex小于等于0说明所有的区间都已经被占用了也无需再加入扩容,结束自旋。如果以上条件不成立,则使用CAS修改扩容进度下标transferIndex尝试占用一个步长的区间,如果失败则自旋继续判定区间占用,修改成功则意味着区间占用成功,赋值本次区间的扩容游标i以及下标边界bound
区间占用成功后开始从i到bound的扩容阶段,首先需要判断i的合法性(需要在i和n之间,即判断是不是已扩容完毕,如–i的循环会使得i为负数表示扩容完毕),如果没占据到区间则判断是不是处于于finishing阶段,处于的话说明自己以及是最后一个扩容线程了,将新容器nextTable赋值为table,并设置新的扩容阈值。如果不是处于finishing阶段则因为当前线程没占据到区间需要退出扩容,那么响应的扩容特征值也要相应的-1来完成扩容线程量特征值的维护,如果扩容特征值维护失败则需要继续自旋尝试退出,如果维护成功则判断自己退出之后是不是只能先一个扩容线程,因为起始扩容线程的线程扩容量特征值为+2,所以通过-2来判断,如果自己退出之后仅剩一个线程在扩容,那么就把finishing设置为true,以使其在扩容工作结束后进行新容器的赋值以及新扩容阈值的维护。
如果i处于合法范围内,说明处于正常扩容中,那么获取i处的节点,节点为空则利用一个CAS为其赋值一个ForwardingNode节点,这个节点是一个中继节点,意味着这个节点正处于扩容状态,其hash值为特殊值MOVED(-1),在5 新数据载入-putVal 的过程中如果发现目标节点hash值为MOVED,那么putVal线程就会暂停put操作,作为辅助扩容线程先行扩容容器,扩容完毕后再进行put操作。
如果i处节点不会空但是hash值已经是MOVED,那么说明节点已经处于迁移状态则跳过。
如果以上均不符合,说明i合法,且i处拥有实际的数据节点,那么使用i处的节点f作为锁通过synchronized来达成线程安全的扩容,因为putVal过程中也是用i处的节点f作为锁进行synchronized的,意味着对i处的操作扩容和赋值只有一个过程能操作,以此来保证putVal和transfer的并发安全性。synchronized内部的扩容过程因为已经保证了线程同步,因此和HashMap的扩容过程区别并不大,这里不再描述,感兴趣的话可以查看9: 从源码看HashMap:一文看懂HashMap了解。

8 辅助扩容-helpTransfer

5 新数据载入-putVal 中以及7 进行扩容-transfer中都提到过如果putVal过程中发现目标节点是一个中继节点ForwardingNode(hash值为特殊值MOVED)那么putVal会暂停put操作,通过helpTransfer方法进行辅助扩容:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

解析: 这个方法是不是看起来很熟悉,是的,除了ForwardingNode判断,其主要逻辑和6 维护与启动扩容-addCount判定扩容时的逻辑大致相当。因此也不在额外表述。这里贴出来的目的仅仅是为了完整的体现putVal遭遇扩容并发的处理过程。

以上就是JAVA核心知识之从源码看ConcurrentHashMap的详细内容,更多关于JAVA核心知识之从源码看ConcurrentHashMap的资料请关注服务器之家其它相关文章!

原文链接:https://blog.csdn.net/yue_hu/article/details/113754357

延伸 · 阅读

精彩推荐