0%

深入浅出Java多线程-阻塞队列BlockingQueue

阻塞队列的由来

在生产者-消费者模式中,为了使生产者消费者解藕,需要一个存放元素的容器,使生产者可以只关心往队列里添加元素下,消费者只关系从队列中取出元素进程处理。

而且这个队列必须要满足两点:

  • 线程安全
  • 缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者

JDK 为此设计了 阻塞队列(BlockingQueue),并提供了几个基于 BlockingQueue 接口 实现的一些线程安全的阻塞队列。

BlockingQueue的操作方法:

阻塞队列提供了四组不同的方法用于插入、移除、检查元素:

功能分类 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() - -
  • 抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

  • 返回特殊值:插入方法会返回是否成功,成功则返回 true。移除方法,则是从队列里拿出一个元素,如果没有则返回 null

  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到拿到数据,或者- 响应中断退出。当队列空时,消费者线程试图从队列里 take 元素,队列也会阻塞消费者线程,直到队列可用。

  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

需要特别注意的是:

  • 不能往阻塞队列中插入null,会抛出空指针异常。
  • 可以访问阻塞队列中的任意元素,调用remove(o)可以将队列之中的特定对象移除,但并不高效,尽量避免使用。

JDK 提供的阻塞队列

JDK 定义了 BlockingQueue 接口,并在Java util.concurrent 下提供了一些实现类。

0aa769cbe5c1e7beb7730c803df1285f.png

JDK7 主要提供了 7 个阻塞队列,分别是:

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。

根据是否是有界,是否有加锁,数据结构,汇总表格如下:

阻塞队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
LinkedBlockingDeque optionally-bounded 加锁 linkedlist
LinkedTransferQueue unbounded 无锁(CAS实现) linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

阻塞队列的实现原理 以 ArrayBlockingQueue 为例子

阻塞队列的原理很简单,利用了Lock锁的多条件(Condition)阻塞控制。接下来我们分析ArrayBlockingQueue JDK 1.8 的源码。

构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ArrayBlockingQueue(int capacity) {
//..省略代码
}

public ArrayBlockingQueue(int capacity, boolean fair) {
//..省略代码

}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//..省略代码

}

可以初始化队列大小, 且一旦初始化不能改变。构造方法中的fair表示控制对象的内部锁是否采用公平锁,默认是非公平锁。Collection 可以传入最初包含的元素的集合。

首先是构造器,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器 Condition,分别是notEmpty和notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是put操作时,给他加上监视器notFull,标记这个线程是一个生产者;当线程是take操作时,给他加上监视器notEmpty,标记这个线程是消费者。

下面是初始化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//数据元素数组
final Object[] items;

//下一个待取出元素索引
int takeIndex;

//下一个待添加元素索引
int putIndex;

//元素个数
int count;

//内部锁
final ReentrantLock lock;

//消费者监视器 Condition for waiting takes
private final Condition notEmpty;

//生产者监视器 Condition for waiting puts
private final Condition notFull;

public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

put 操作的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 1.自旋拿锁
lock.lockInterruptibly();
try {
// 2.判断队列是否满了
while (count == items.length)
// 2.1如果满了,阻塞该线程,并标记为notFull线程,
// 等待notFull的唤醒,唤醒之后继续执行while循环。
notFull.await(); //底层是通过LockSupport.park 调用 UNSAFE.park() 实现

// 3.如果没有满,则进入队列
enqueue(e);
} finally {
lock.unlock();
}
}

//入队并通过 notEmpty.signal() 唤醒一个消费者线程
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 4 唤醒一个等待的线程
notEmpty.signal(); //底层是通过LockSupport.unpark 调用 UNSAFE.unpark(thread) 实现
}

总结put的流程:

  1. 所有执行put操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。

  2. 判断阻塞队列是否满了,如果满了,则调用await方法阻塞这个线程,并标记为notFull(生产者)线程,同时释放lock锁,等待被消费者线程唤醒。

  3. 如果没有满,则调用enqueue方法将元素put进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。

  4. 唤醒一个标记为notEmpty(消费者)的线程。

take操作的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//用while 而不用 if 是为了唤醒后重新判断一次,避免count状态发生变化
while (count == 0)
notEmpty.await(); //底层是通过LockSupport.park 调用 UNSAFE.park() 实现
return dequeue();
} finally {
lock.unlock();
}
}

//从队列取出一个元素并通过 notFull.signal 唤醒生产者线程
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //底层是通过LockSupport.unpark 调用 UNSAFE.unpark(thread) 实现
return x;
}

take操作和put操作的流程是类似的,总结一下take操作的流程:

  1. 所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。

  2. 判断阻塞队列是否为空,如果是空,则调用await方法阻塞这个线程,并标记为notEmpty(消费者)线程,同时释放lock锁,等待被生产者线程唤醒。

3.如果没有空,则调用dequeue方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。

  1. 唤醒一个标记为notFull(生产者)的线程。

需要注意:

  1. put和take操作都需要先获取锁,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。

  2. 就算拿到锁了之后,也不一定会顺利进行put/take操作,需要判断队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁。

  3. 在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续往下执行,否则,自旋拿锁

  4. await 前面用while 而不用 if 是为了唤醒后重新判断一次,避免count状态发生变化。这里是有讲究的,因为这个线程被唤醒后条件里的值很可能已经改变了,不再满足了,如果用 if,线程唤醒后会根据程序计数器的记录直接执行if后面的逻辑,而用while 可以确保再判断一次。

通过上面代码可以看到, put, take 内部是用在没有成功之前是会一直阻塞的。下面看下 offer 和 poll 对比下

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

通过以上代码可以看到 通过 offer 方法插入的话是会立马返回结果 true / false, 而通过 poll 从队列删除元素 成功会直接返回元素对象, 失败会返回 null.

案例:实现一个简单的在生产者消费者模型

通过 ArrayBlockingQueue 实现一个最简单的在生产者-消费者模型:

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ArrayBlockingQueueDemo {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);

public static void main(String[] args) {
ArrayBlockingQueueDemo test = new ArrayBlockingQueueDemo();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();

producer.start();
consumer.start();
}

class Consumer extends Thread {
@Override
public void run() {
consume();
}

private void consume() {
while (true) {
try {
queue.take();
Thread.sleep(new Random().nextInt(2000));
System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Producer extends Thread {
@Override
public void run() {
produce();
}

private void produce() {
while (true) {
try {
Thread.sleep(new Random().nextInt(1000));
queue.put(1);
System.out.println("向队列取中插入一个元素,队列剩余空间:" + (queueSize - queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

运行结果如下:
157e3e8fd16022e3003be6f8e750f981.png

高并发下无界队列容易产生OOM问题

队列按实现可以分为有界队列,与无界队列。在高并发环境,生产者的生产速度往往比消费者速度快很多,如果使用无界队列,队列无限扩大,容易吃掉内存导致 OOM。

案例

案例:在高并发环境下使用无界队列 ConcurrentLinkedQueue,生产者速度比消费者速度快,导致OOM。用 Jprofiler 分析dump下来的内存镜像如下所示。

5c104305c6715eb84ee00d4b51584101.png

看下 ConcurrentLinkedQueue 的 offer 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

可以看到由于是没有限制的,offer 的结果永远都是成功的,这样队列就会无限扩张而吃掉内存,导致OOM。

在JDK提供的线程安全的内置队列分类

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
LinkedBlockingDeque optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁(CAS实现) linkedlist
LinkedTransferQueue unbounded 无锁(CAS实现) linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue

总结:在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列推荐使用在ArrayBlockingQueue

参考文献

1.阻塞队列
2.聊聊并发(七)——Java中的阻塞队列
3.高性能队列——Disruptor