Java8集合系列之LinkedBlockingQueue(八)

上一篇文章分析了ConcurrentLinkedQueue,实现了无限不定长的无锁并发的队列,线程安全。这篇文章则分析另外一种队列实现,线程安全的阻塞队列(BlockingQueue)。

阻塞队列浅析

阻塞队列与非阻塞队列

之前我们接触的linkedlist是非阻塞队列,LinkedList是双向链表,它实现了Dequeue接口。
使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。
但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。

常用阻塞队列

自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

  • ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。
  • LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

还有另外两种,PriorityBlockingQueue和DelayQueue,前者会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。而后者是一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。

LinkedBlockingQueue源码解析

LinkedBlockingQueue继承关系如下,可以看到该队列实现了阻塞队列的接口,并继承了抽象队列:

1
2
LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable

下图是BlockingQueue接口的结构:
图一
不同的方法实现效果不同,整理后如下表所示:
图二

内部类Node

接着来看LinkedBlockingQueue的内部类Node,也就是链表的结点,这个很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, 当前结点为尾结点
*/
Node<E> next;
Node(E x) { item = x; }
}

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** 容量约束,如果不设则默认为Integer.MAX_VALUE */
private final int capacity;
/**当前元素数量*/
private final AtomicInteger count = new AtomicInteger();
/**头尾指针变量*/
transient Node<E> head;
private transient Node<E> last;
/** 头部锁:take, poll等操作是lock*/
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/**尾部锁:put, offer等操作时lock*/
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

构造方法

1
2
3
4
5
6
7
8
9
10
11
/**创建队列,默认队列的容量大小限制是Integer.MAX_VALUE*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**指定容量限制*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();//非法参数异常
this.capacity = capacity;
last = head = new Node<E>(null);//头尾结点指向同一个空结点
}

add/remove/element

这三个方法并不是在LinkedBlockingQueue的类中实现,而是如下图:

是在AbstractQueue抽象类中实现,代码逻辑如下;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

真相一目了然,只不过是对offer/poll/peek方法的包装而已,正如图二中所示,offer/poll/peek执行失败直接返回false和null,而add/remove/element取而代之的是抛出三个异常而已。接下来分别看offer和poll方法的实现。

offer/poll/peek

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//原子处理类
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;//到达容量限制,直接返回false
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();//上锁(如果有其他线程持有锁,则等待)
try {
if (count.get() < capacity) {//再判断一次
enqueue(node);//插入结点
c = count.getAndIncrement();//原子操作递增,返回递增前的数量
if (c + 1 < capacity)
//释放不满的信号给putLock锁的等待队列
notFull.signal();
}
} finally {
putLock.unlock();//释放锁
}
if (c == 0)
//之前的队列为空,添加一个之后释放不空的信号给takeLock锁的等待队列
signalNotEmpty();
return c >= 0;
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

以下为poll方法的源码,大同小异:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;//队列没有元素的时候,二话不说返回null
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {//再判断一次是否为0
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;//
Node<E> first = h.next;//第一个元素结点
h.next = h; // help GC(头结点自己指向自己)
head = first;//head变量指向第一个元素结点
E x = first.item;//获取第一个元素值
first.item = null;//第一个元素值赋空,成为头结点
return x;
}

peek方法的实现就更简单了,不再赘述。

put/take

据图二所知,put、take方法是两个具备阻塞功能的方法,具体阻塞和非阻塞的区别已经在文章首部说明,在此不在阐述。put方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//putLock上锁(也是一直等到到持有锁,但是接受中断信号)
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//如果容量达到限制就一直等待,这个就是阻塞了,直到被“唤醒”,相比较非阻塞而言如果达到容量限制,直接跳过了,返回false。
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

take的方法与put方法类似,关键在于阻塞。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//当队列为空一直阻塞
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

延迟方法 offer/poll

设定一定的等待时间timeout,超过这个时间前者返回false后者返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);//限定等待时长
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//如果容量一直处于上限,则等待
while (count.get() == capacity) {
if (nanos <= 0)//小于0,时间到,返回false
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

延迟poll方法代码类似,如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列为空,则阻塞
while (count.get() == 0) {
if (nanos <= 0)//等待一定时间无济于事则返回null
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

这样LinkedBlockingQueue的源码分析,便可结束,总结几点如下:

  1. 通过ReentrantLock锁的方式,对于offer/poll/peek操作都需要上锁,在大量并发的情况下,效率较低。
  2. 自带阻塞的方法便于处理带消费者生产者场景的程序,简化了开发的难度。
  3. 默认容量限制为:Integer.MAX_VALUE=0x7fffffff=2^31-1。

参考资料

Java并发编程:阻塞队列