Java8集合系列之ConcurrentLinkedQueue(七)

队列是一种经常被用到的数据结构,比如在生产者消费者场景中,简单则如一个链表实现,复杂则想kafka、rocketmq等等分布式消息队列。本文基于JDK1.8来分析java.util.concurrent包中的ConcurrentLinkedQueue,这个基于链表和CAS的并发优化的队列。

内部类

Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static class Node<E> {
volatile E item;
volatile Node<E> next;
//构造结点
Node(E item) {
//设置item的值
UNSAFE.putObject(this, itemOffset, item);
}
//修改item的值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//设置next指针
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//修改next指针
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
//下面分别是item和next的偏移量
private static final long itemOffset;
private static final long nextOffset;
//通过反射机制获取值
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

相比较LinkedList中内部类Node的简单(参见LinkedList),ConcurrentLinkedQueue的Node则费了不少心思。

head&tail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
//CAS修改tail值
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
//CAS修改head值
private boolean casHead(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
// Unsafe mechanics
//初始化的时候就会获取head和tail的偏移量
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentLinkedQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}

构造方法

1
2
3
4
5
public ConcurrentLinkedQueue() {
//创建一个item为null的结点,next也为null
//头指针和尾指针都指向该结点
head = tail = new Node<E>(null);
}

add

添加元素不能为null,否则抛出NPE。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//将指定元素插入队列尾部,作为无界队列,永远不会抛出IllegalStateException异常,也不会返回false。
//添加null元素会抛出NullPointerException
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
//检查e不为null,创建新结点
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是尾结点,用CAS原子操作修改next
if (p.casNext(null, newNode)) {
if (p != t)
casTail(t, newNode);
return true;
}
// 失去CAS竞争给另一个线程; 重新读next
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
//t != (t = tail)的运算逻辑就是用t之前的值比较更新的值
p = (p != t && t != (t = tail)) ? t : q;
}
}

这个方法中的逻辑看了半天,才看明白,查了网上的资料发现大多说的也不清不楚,索性整理一下记下来。我们首先看单线程的情况。
第一次添加元素,此时head、tail都指向头结点。
1、取tail的值赋给p、t.
2、执行Node q = p.next;此时q必定为null,所以进入第一个分支,p.next指向newNode,因为此时p==t所以,跳过,直接返回true。(注意并没有更新tail的值)
第二次添加元素,此时head、tail依旧指向头结点。
1、取tail的值赋给p、t.
2、执行Node q = p.next;判断q是否为null。此时q不为null,p!=p(这个也是必然的),进入第三个分支。
3、p==t,所以根据三目运算符,p被赋值为q,这一步结束后p指向了第一次新添加的结点,相当于完成了p指针向尾部元素的推移。
4、开始循环,进入上述第二步,此时q为null,所以照常添加第二个元素,接着判断p是否等于t,我们知道tail此时仍然指向头结点,未曾变化,所以p!=t,这样就完成了tail的更新,tail指向尾结点。

这样,单线程的添加是没有问题了,主要就是设计第一个和第三分支的代码逻辑。我们再分析多线程同时添加元素的情况,此时tail指向尾结点:
1、假设某个线程A在第一个分支q==null进入之后,casNext操作之前,另外一个线程B竞争成功,这样A操作p.casNext(null, newNode)失败。
2、继续循环,执行Node q = p.next;此时q当然不为null,应该是指向了更新的结点。
3、进入第三个分支,p==t,导致p赋值为q,后面的流程就是跟之前的单线程差不多了,可以自行推导。

这样看来第三个分支的作用主要就是,更新变量p:

  • 使p指向更新后的tail尾结点。(多线程中)
  • 使p一步步的向尾部迁移。(单线程中)

这种CAS乐观锁的方式相比较锁定而言,效率提高不少。

poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {//如果q为null,说明到了尾结点,更新head
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}

同样,poll方法也可以分为单线程和多线程两种情况分析。
先看单线程情况,此时假设head指向头结点,item等于null:

1、第一次弹出元素,进入第三个分支,很显然,p开始指向头部第一个元素。
2、进入循环,item!=null成立,单线程情况下p.casItem(item, null)成功,p != h成立,开始执行updateHead方法,具体见上面代码,更新head值,head指向p.next(假设不为null),返回item。
3、第二次弹出,此时item不为null,经过casItem之后,p.item为null,但是此时p依旧等于h,跳过,直接返回item。

我们发现两次弹出元素之后,不是每次都更新head值。这是单线程的情况,多线程无非就是在第一个分支casItem失败之后,执行最后一步p=q,让p挪到下一个结点。

以上是无限并发非阻塞队列ConcurrentLinkedQueue的主要内容,至于还有peek方法是只读取而不删除头部结点,在此不做分析。

-EOF-