阻塞队列的由来
在生产者-消费者模式中,为了使生产者消费者解藕,需要一个存放元素的容器,使生产者可以只关心往队列里添加元素下,消费者只关系从队列中取出元素进程处理。
而且这个队列必须要满足两点:
- 线程安全
- 缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者
JDK 为此设计了 阻塞队列(BlockingQueue),并提供了几个基于 BlockingQueue 接口 实现的一些线程安全的阻塞队列。
BlockingQueue的操作方法:
阻塞队列提供了四组不同的方法用于插入、移除、检查元素:
功能分类 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | - | - |
抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
返回特殊值:插入方法会返回是否成功,成功则返回 true。移除方法,则是从队列里拿出一个元素,如果没有则返回 null
一直阻塞:当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到拿到数据,或者- 响应中断退出。当队列空时,消费者线程试图从队列里 take 元素,队列也会阻塞消费者线程,直到队列可用。
超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
需要特别注意的是:
- 不能往阻塞队列中插入null,会抛出空指针异常。
- 可以访问阻塞队列中的任意元素,调用remove(o)可以将队列之中的特定对象移除,但并不高效,尽量避免使用。
JDK 提供的阻塞队列
JDK 定义了 BlockingQueue 接口,并在Java util.concurrent 下提供了一些实现类。
JDK7 主要提供了 7 个阻塞队列,分别是:
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
根据是否是有界,是否有加锁,数据结构,汇总表格如下:
阻塞队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
LinkedBlockingDeque | optionally-bounded | 加锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁(CAS实现) | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
阻塞队列的实现原理 以 ArrayBlockingQueue 为例子
阻塞队列的原理很简单,利用了Lock锁的多条件(Condition)阻塞控制。接下来我们分析ArrayBlockingQueue JDK 1.8 的源码。
构造函数:
1 | public ArrayBlockingQueue(int capacity) { |
可以初始化队列大小, 且一旦初始化不能改变。构造方法中的fair表示控制对象的内部锁是否采用公平锁,默认是非公平锁。Collection 可以传入最初包含的元素的集合。
首先是构造器,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器 Condition,分别是notEmpty和notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是put操作时,给他加上监视器notFull,标记这个线程是一个生产者;当线程是take操作时,给他加上监视器notEmpty,标记这个线程是消费者。
下面是初始化代码:
1 | //数据元素数组 |
put 操作的源码如下:
1 | public void put(E e) throws InterruptedException { |
总结put的流程:
所有执行put操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
判断阻塞队列是否满了,如果满了,则调用await方法阻塞这个线程,并标记为notFull(生产者)线程,同时释放lock锁,等待被消费者线程唤醒。
如果没有满,则调用enqueue方法将元素put进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
唤醒一个标记为notEmpty(消费者)的线程。
take操作的源码如下:
1 | public E take() throws InterruptedException { |
take操作和put操作的流程是类似的,总结一下take操作的流程:
所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
判断阻塞队列是否为空,如果是空,则调用await方法阻塞这个线程,并标记为notEmpty(消费者)线程,同时释放lock锁,等待被生产者线程唤醒。
3.如果没有空,则调用dequeue方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
- 唤醒一个标记为notFull(生产者)的线程。
需要注意:
put和take操作都需要先获取锁,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。
就算拿到锁了之后,也不一定会顺利进行put/take操作,需要判断队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁。
在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续往下执行,否则,自旋拿锁
await 前面用while 而不用 if 是为了唤醒后重新判断一次,避免count状态发生变化。这里是有讲究的,因为这个线程被唤醒后条件里的值很可能已经改变了,不再满足了,如果用 if,线程唤醒后会根据程序计数器的记录直接执行if后面的逻辑,而用while 可以确保再判断一次。
通过上面代码可以看到, put, take 内部是用在没有成功之前是会一直阻塞的。下面看下 offer 和 poll 对比下
代码如下:
1 | /** |
通过以上代码可以看到 通过 offer 方法插入的话是会立马返回结果 true / false, 而通过 poll 从队列删除元素 成功会直接返回元素对象, 失败会返回 null.
案例:实现一个简单的在生产者消费者模型
通过 ArrayBlockingQueue 实现一个最简单的在生产者-消费者模型:
代码如下:
1 | public class ArrayBlockingQueueDemo { |
运行结果如下:
高并发下无界队列容易产生OOM问题
队列按实现可以分为有界队列,与无界队列。在高并发环境,生产者的生产速度往往比消费者速度快很多,如果使用无界队列,队列无限扩大,容易吃掉内存导致 OOM。
案例
案例:在高并发环境下使用无界队列 ConcurrentLinkedQueue,生产者速度比消费者速度快,导致OOM。用 Jprofiler 分析dump下来的内存镜像如下所示。
看下 ConcurrentLinkedQueue 的 offer 方法的实现如下:
1 | /** |
可以看到由于是没有限制的,offer 的结果永远都是成功的,这样队列就会无限扩张而吃掉内存,导致OOM。
在JDK提供的线程安全的内置队列分类
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
LinkedBlockingDeque | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁(CAS实现) | linkedlist |
LinkedTransferQueue | unbounded | 无锁(CAS实现) | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。
我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。
通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue
总结:在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列推荐使用在ArrayBlockingQueue