BlockingQueue的实现类
在 Java 并发编程中,BlockingQueue
是 java.util.concurrent
包下的一个关键接口,主要用于在多线程环境中安全地传递数据。它提供了阻塞操作,能够有效地控制生产者-消费者模型。本文将介绍 BlockingQueue
在 Java 集合框架中的层级结构,并深入分析其主要实现类:ArrayBlockingQueue
、SynchronousQueue
和 LinkedBlockingQueue
。
从层级结构可以看出,BlockingQueue
继承自 Queue
,同时也是 Collection
接口的一部分。它的主要特点是提供了线程安全的阻塞方法,如 put()
和 take()
,以控制数据的流动。
BlockingQueue
BlockingQueue接口都提供了以下几个方法来操作队列
操作类型\方法名称 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
获取队首元素 | element | peak | - | - |
其中,抛出异常是指阻塞队列执行操作时,会抛出异常。具体来说,当阻塞队列满时,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取或删除元素时会抛出NoSuchElementException异常 。
返回特殊值是指阻塞队列执插入或移除操作时,均会返回特殊值。具体来说,插入方法会返回是否成功,成功则返回true。移除方法会从队列里拿出一个元素,如果没有则返回null。
一直阻塞是指线程在使用阻塞队列执行附加操作时,会阻塞。具体来说,当阻塞队列满时,如果生产者线程继续向阻塞队列里put元素,阻塞队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,如果消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
超时退出是指线程在使用阻塞队列执行附加操作时,阻塞一定时间后,会退出。具体来说,当阻塞队列满时,如果生产者线程继续向阻塞队列里offer元素,阻塞队列会阻塞生产者线程一段时间,如果超过一定的时间,队列仍满,生产者线程就会退出。当队列空时,如果消费者线程试图从队列里poll元素,阻塞队列也会阻塞消费者线程一段时间,如果超过一定的时间,队列仍空,消费者线程就会退出。
ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的固定的(容量一旦设置不能改变,不支持动态扩容)有界阻塞队列。ArrayBlockingQueue支持公平和非公平访问,且默认是非公平访问(参考JDK 1.8源码)。
每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是指当前等待时间最长的线程先获取锁
针对插入和移除操作,ArrayBlockingQueue提供了多种实现,这里重点分析下put方法和take方法,线程在调用put方法向阻塞队列插入数据时,如果检测到阻塞队列已满,则会一直等待,直到队列非满或者响应中断退出。关键代码如下:
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e); // 检查元素是否为null,如果是null则抛出NullPointerException
final ReentrantLock lock = this.lock; // 获取队列的锁
lock.lockInterruptibly(); // 获取锁,允许在等待锁的过程中被中断
try {
while (count == items.length) // 如果队列已满
notFull.await(); // 等待队列不满的信号
enqueue(e); // 将元素插入队列
} finally {
lock.unlock(); // 释放锁
}
}
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++; // 增加队列中元素的数量
notEmpty.signal(); // 唤醒等待在notEmpty条件上的线程
}
在执行put方法时,如果队列已满,那么当前线程将会被添加到notFull Condition对象的等待队列中,直到队列非满时,才会唤醒执行添加操作。如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到队列中。 所以,可以推断,当在阻塞队列上执行移除元素操作时,在移除成功的同时会唤醒notFull Condition对象的等待队列的线程。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; // 获取队列的锁
lock.lockInterruptibly(); // 获取锁,允许在等待锁的过程中被中断
try {
while (count == 0) // 如果队列为空
notEmpty.await(); // 等待队列不空的信号
return dequeue(); // 从队列中移除并返回元素
} finally {
lock.unlock(); // 释放锁
}
}
private E dequeue() {
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 唤醒等待在notFull条件上的线程
return e;
}
在执行take方法时,如果队列已空,那么当前线程将会被添加到notEmpty Condition对象的等待队列中,直到队列非空时,才会唤醒执行移除操作。如果队列不为空,那么就直接调用dequeue()方法从队头移除元素。 所以,可以推断,当在阻塞队列上执行添加元素操作时,在添加成功的同时会唤醒notEmpty Condition对象的等待队列的线程。
SynchronousQueue
SynchronousQueue是一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁(默认是非公平)。SynchronousQueue的一个使用场景是在线程池里。
SynchronousQueue 的公平模式(FIFO,先进先出)可以减少了线程之间的冲突,在竞争频繁的场景下反而具备更高的性能,而非公平模式(LIFO,后进先出)能够更好的维持线程局部性(thread locality),减少线程上下文切换的开销。
LinkedBlockingQueue
LinkedBlockingQueue是一个由链表结构组成的有界队列,此队列的默认最大长度为Integer.MAX_VALUE(可将其看成无界队列)。在使用LinkedBlockingQueue时,也可以在初始化时指定其容量。 LinkedBlockingQueue队列按照先进先出(FIFO)的顺序进行排序。
因为实现了BlockingQueue接口,所以针对插入和移除操作,LinkedBlockingQueue同样提供了多种实现。与ArrayBlockingQueue一致,这里重点分析下put方法和take方法(阻塞的调用方式较常用)。
public void put(E e) throws InterruptedException {
// 检查插入的元素是否为null,如果为null则抛出NullPointerException
if (e == null) throw new NullPointerException();
final int c; // 用于记录当前队列中的元素数量
final Node<E> node = new Node<E>(e); // 创建一个新的节点,存储要插入的元素
final ReentrantLock putLock = this.putLock; // 获取putLock,用于控制插入操作的锁
final AtomicInteger count = this.count; // 获取count,记录队列中的元素数量
// 获取putLock锁,允许在等待锁的过程中响应中断
putLock.lockInterruptibly();
try {
// 如果队列已满(元素数量等于容量),则当前线程进入等待状态
while (count.get() == capacity) {
notFull.await(); // 等待notFull条件(队列未满)的信号
}
// 将新节点插入队列尾部
enqueue(node);
// 获取当前队列中的元素数量,并将其加1
c = count.getAndIncrement();
// 如果插入元素后队列仍未满,则唤醒其他可能正在等待插入的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 无论插入操作是否成功,最终都会释放putLock锁
putLock.unlock();
}
// 如果插入元素前队列为空(c == 0),则唤醒可能正在等待取元素的线程
if (c == 0)
signalNotEmpty();
}
// 将新节点插入队列尾部的方法
private void enqueue(Node<E> node) {
// 将新节点添加到链表尾部,并更新last指针
last = last.next = node;
}
分析LinkedBlockingQueue的put方法源码可以发现,其处理流程与ArrayBlockingQueue的思路一致:线程在调用put方法向阻塞队列插入数据时,如果检测到阻塞队列已满,则会一直等待,直到队列非满或者响应中断退出。
public E take() throws InterruptedException {
final E x; // 用于存储从队列中取出的元素
final int c; // 用于记录当前队列中的元素数量
final AtomicInteger count = this.count; // 获取队列的元素计数器
final ReentrantLock takeLock = this.takeLock; // 获取takeLock,用于控制取操作的锁
// 获取takeLock锁,允许在等待锁的过程中响应中断
takeLock.lockInterruptibly();
try {
// 如果队列为空(元素数量为0),则当前线程进入等待状态
while (count.get() == 0) {
notEmpty.await(); // 等待notEmpty条件(队列不为空)的信号
}
// 从队列头部取出元素
x = dequeue();
// 获取当前队列中的元素数量,并将其减1
c = count.getAndDecrement();
// 如果取出元素后队列仍不为空,则唤醒其他可能正在等待取元素的线程
if (c > 1)
notEmpty.signal();
} finally {
// 无论取操作是否成功,最终都会释放takeLock锁
takeLock.unlock();
}
// 如果取出元素前队列已满(c == capacity),则唤醒可能正在等待插入的线程
if (c == capacity)
signalNotFull();
// 返回取出的元素
return x;
}
private E dequeue() {
// 获取头节点(哨兵节点)和第一个实际存储元素的节点
Node<E> h = head; // h 是哨兵节点
Node<E> first = h.next; // first 是第一个实际存储元素的节点
// 将哨兵节点的next指向自己,帮助GC回收
h.next = h; // 将哨兵节点的next指向自己,断开与链表的连接
// 更新head指针,使其指向新的头节点(first)
head = first;
// 获取第一个实际节点的元素值
E x = first.item;
// 将第一个实际节点的item置为null,帮助GC回收
first.item = null;
// 返回取出的元素
return x;
}
线程在调用take方法从阻塞队列移除数据时,如果检测到阻塞队列已空,则会一直等待,直到队列非空或者响应中断退出。
评论区