Java8集合系列之ConcurrentHashMap(五)

Before JDK1.8版本

Hashtable

我们都知道HashMap是线程不安全的。Hashtable是线程安全的。看过Hashtable源码的我们都知道Hashtable的线程安全是采用在每个方法来添加了synchronized关键字来修饰,即Hashtable是针对整个table的锁定,这样就导致HashTable容器在竞争激烈的并发环境下表现出效率低下。翻阅Hashtable的源码,确实在put和get方法之前都添加了synchronized来完成同步操作,这样无论是在put还是get的时候只能同时进行一种操作,毫无疑问这种效率是极其低下的。

1
2
3
4
5
6
public synchronized V put(K key, V value) {
...
}
public synchronized V get(Object key) {
...
}

锁分离技术

基于Hashtable的缺点,人们就开始思考,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率呢??这就是我们的“锁分离”技术,这也是ConcurrentHashMap实现的基础。

ConcurrentHashMap使用的就是锁分段技术,ConcurrentHashMap由多个Segment组成(Segment下包含很多Node,也就是我们的键值对了),每个Segment都有把锁来实现线程安全,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

ConcurrentHashMap JDK1.8

前言

  1. JDK1.8的ConcurrentHashMap中Segment虽保留,但已经简化属性,仅仅是为了兼容旧版本。
  2. ConcurrentHashMap的底层与Java1.8的HashMap有相通之处,底层依然由“数组”+链表+红黑树来实现的,底层结构存放的是TreeBin对象,而不是TreeNode对象。
  3. ConcurrentHashMap实现中借用了较多的CAS算法,unsafe.compareAndSwapInt(this, valueOffset, expect, update); CAS(Compare And Swap)。
    ConcurrentHashMap既然借助了CAS来实现非阻塞的无锁实现线程安全,那么是不是就没有用锁了呢?在操作hash值相同的链表的头结点还是会synchronized上锁,这样才能保证线程安全。因为修改链表头结点会涉及到一系列的操作,这些操作必须要保证原子性。

重要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 盛装Node元素的数组 它的大小是2的整数次幂
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
//默认容量
private static final int DEFAULT_CAPACITY = 16;
/**
* hash表初始化或扩容时的一个控制位标识量。
* 负数代表正在进行初始化或扩容操作
* -1代表正在初始化
* -N 表示有N-1个线程正在进行扩容操作
* 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。
*/
private transient volatile int sizeCtl; //volatile变量

Node/TreeNode/TreeBin

  • Node类
    Node是最核心的内部类,它包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是但是有一些差别就是value和next设置成了volatile变量,不允许调用setValue方法直接改变Node的value域,它增加了find方法辅助map.get()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//1.Node类是没有提供修改入口的(setValue方法抛异常,供子类实现),即是可读的。只能用于只读遍历。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;//2.volatile,保证可见性
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
//3.setValue跑出异常
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
  • TreeNode类
    树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把每个Node结点包装成TreeNode,然后将这些TreeNode放在一个TreeBin对象中,由TreeBin完成对红黑树的包装。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
//此处调用了Node的构造方法,实际还是一个Node
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
//根据key找到所属的TreeNode
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
...
}
}
  • TreeBin类
    这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。
    下图是整个TreeBin类的结构图:


TreeBin类中除了TreeBin构造方法,还有rotateLeft、rotateRight和balanceInsertion等,这些概念我们在数据结构中应该接触过,另外,还有lockRoot、unlockRoot方法,是为了树在restructuring的时候,进行写锁保护(write lock)。
下面主要来看看TreeBin构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
TreeNode<K,V> root;//TreeBin指向红黑树的根节点
volatile TreeNode<K,V> first;
/**
* Creates bin with initial set of nodes headed by b.
*/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);//hash = TREEBIN = -2;
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;//next指向x的下一个结点
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);//检查红黑树的合法性
}

Unsafe与CAS

在ConcurrentHashMap中,随处可以看到U, 大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁带来的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。CAS的详细可以参见CAS原子操作.
下面是基于CAS的对指定位置数据的修改方法,分别是调用的U的三个原子操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
//获得在i位置上的Node节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//利用CAS算法设置i位置上的Node节点。这个方法是CAS原子操作的典型应用。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//设置结点
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

initTable

对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl表示有其他线程正在进行初始化操作,把线程挂起。对于table的初始化工作,只能有一个线程在进行。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//sc=容量*0.75
}
} finally {
sizeCtl = sc;//sizeCtl设置sc
}
break;
}
}
return tab;
}

put

前面的所有的介绍其实都为这个方法做铺垫。ConcurrentHashMap最常用的就是put和get两个方法,不允许key或value为null的情况放入,这一点跟HashMap相反。整体流程就是对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置。

  • 如果这个位置是空的,那么直接放入,而且不需要加锁操作。
  • 如果这个位置不是空的,存在结点,先要加锁,也说明发生了hash碰撞,首先判断这个节点的类型。
    • 如果是链表结点Node(fh>0),则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。
    • 如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允许 key或value为null
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
//死循环 何时插入成功 何时跳出
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table为空的话,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//根据hash值计算出在table里面的位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果这个位置没有值 ,直接放进去,不需要加锁
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//当遇到表连接点时,需要进行整合表的操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//结点上锁 这里的结点可以理解为hash值相同组成的链表的头结点
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh>0 说明这个节点是一个链表的节点 不是树的节点
if (fh >= 0) {
binCount = 1;
//在这里遍历链表所有的结点
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果hash值和key值相同 则修改对应结点的value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果这个节点是树节点,就按照树的方式插入值
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果链表长度已经达到临界值8 就需要把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将当前ConcurrentHashMap的元素数量+1
addCount(1L, binCount);
return null;
}

下面是treeifyBin方法,这个方法用于将过长的链表转换为TreeBin对象。但是他并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才链表的结构抓换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//如果table.length<64 就扩大一倍 返回
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
//构造了一个TreeBin对象 把所有Node节点包装成TreeNode放进去
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);//这里只是利用了TreeNode封装 而没有利用TreeNode的next域和parent域
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//在原来index的位置 用TreeBin替换掉原来的Node对象
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

get

get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
Node<K,V>[] tab;
Node<K,V> e, p;
int n, eh;
K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)//说明是树,调用TreeBin的find方法
return (p = e.find(h, key)) != null ? p.val : null;
//如果是链表,则遍历查询
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

参考

ConcurrentHashMap源码分析(JDK8版本)
《Java源码分析》:ConcurrentHashMap JDK1.8

-EOF-