侧边栏壁纸
博主头像
MD_Tech_博客

行动起来,活在当下

  • 累计撰写 8 篇文章
  • 累计创建 8 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

BlockingQueue接口及其常用实现类

Siuux_7
2025-03-08 / 0 评论 / 0 点赞 / 5 阅读 / 0 字

BlockingQueue的实现类

在 Java 并发编程中,BlockingQueuejava.util.concurrent 包下的一个关键接口,主要用于在多线程环境中安全地传递数据。它提供了阻塞操作,能够有效地控制生产者-消费者模型。本文将介绍 BlockingQueue 在 Java 集合框架中的层级结构,并深入分析其主要实现类:ArrayBlockingQueueSynchronousQueueLinkedBlockingQueue

BlockingQueue接口层级结构.png

从层级结构可以看出,BlockingQueue 继承自 Queue,同时也是 Collection 接口的一部分。它的主要特点是提供了线程安全的阻塞方法,如 put()take(),以控制数据的流动。

BlockingQueue

BlockingQueue接口都提供了以下几个方法来操作队列

操作类型\方法名称 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
获取队首元素 element peak - -

其中,抛出异常是指阻塞队列执行操作时,会抛出异常。具体来说,当阻塞队列满时,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取或删除元素时会抛出NoSuchElementException异常 。

返回特殊值是指阻塞队列执插入或移除操作时,均会返回特殊值。具体来说,插入方法会返回是否成功,成功则返回true。移除方法会从队列里拿出一个元素,如果没有则返回null。

一直阻塞是指线程在使用阻塞队列执行附加操作时,会阻塞。具体来说,当阻塞队列满时,如果生产者线程继续向阻塞队列里put元素,阻塞队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,如果消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

超时退出是指线程在使用阻塞队列执行附加操作时,阻塞一定时间后,会退出。具体来说,当阻塞队列满时,如果生产者线程继续向阻塞队列里offer元素,阻塞队列会阻塞生产者线程一段时间,如果超过一定的时间,队列仍满,生产者线程就会退出。当队列空时,如果消费者线程试图从队列里poll元素,阻塞队列也会阻塞消费者线程一段时间,如果超过一定的时间,队列仍空,消费者线程就会退出。

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的固定的(容量一旦设置不能改变,不支持动态扩容)有界阻塞队列。ArrayBlockingQueue支持公平和非公平访问,且默认是非公平访问(参考JDK 1.8源码)。

每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是指当前等待时间最长的线程先获取锁

针对插入和移除操作,ArrayBlockingQueue提供了多种实现,这里重点分析下put方法和take方法,线程在调用put方法向阻塞队列插入数据时,如果检测到阻塞队列已满,则会一直等待,直到队列非满或者响应中断退出。关键代码如下:

public void put(E e) throws InterruptedException {  
    Objects.requireNonNull(e);  // 检查元素是否为null,如果是null则抛出NullPointerException
    final ReentrantLock lock = this.lock;  // 获取队列的锁
    lock.lockInterruptibly();  // 获取锁,允许在等待锁的过程中被中断
    try {  
        while (count == items.length)  // 如果队列已满
            notFull.await();  // 等待队列不满的信号
        enqueue(e);  // 将元素插入队列
    } finally {  
        lock.unlock();  // 释放锁
    }  
}

private void enqueue(E e) {
    final Object[] items = this.items;
    items[putIndex] = e;  
    if (++putIndex == items.length) putIndex = 0;  
    count++;  // 增加队列中元素的数量
    notEmpty.signal();  // 唤醒等待在notEmpty条件上的线程
}

在执行put方法时,如果队列已满,那么当前线程将会被添加到notFull Condition对象的等待队列中,直到队列非满时,才会唤醒执行添加操作。如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到队列中。 所以,可以推断,当在阻塞队列上执行移除元素操作时,在移除成功的同时会唤醒notFull Condition对象的等待队列的线程。

public E take() throws InterruptedException {  
    final ReentrantLock lock = this.lock;  // 获取队列的锁
    lock.lockInterruptibly();  // 获取锁,允许在等待锁的过程中被中断
    try {  
        while (count == 0)  // 如果队列为空
            notEmpty.await();  // 等待队列不空的信号
        return dequeue();  // 从队列中移除并返回元素
    } finally {  
        lock.unlock();  // 释放锁
    }  
}

private E dequeue() {  
    @SuppressWarnings("unchecked")  
    E e = (E) items[takeIndex];  
    items[takeIndex] = null;  
    if (++takeIndex == items.length) takeIndex = 0;  
    count--;  
    if (itrs != null)  
        itrs.elementDequeued();  
    notFull.signal();  // 唤醒等待在notFull条件上的线程
    return e;  
}

在执行take方法时,如果队列已空,那么当前线程将会被添加到notEmpty Condition对象的等待队列中,直到队列非空时,才会唤醒执行移除操作。如果队列不为空,那么就直接调用dequeue()方法从队头移除元素。 所以,可以推断,当在阻塞队列上执行添加元素操作时,在添加成功的同时会唤醒notEmpty Condition对象的等待队列的线程。

SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁(默认是非公平)。SynchronousQueue的一个使用场景是在线程池里。

SynchronousQueue 的公平模式(FIFO,先进先出)可以减少了线程之间的冲突,在竞争频繁的场景下反而具备更高的性能,而非公平模式(LIFO,后进先出)能够更好的维持线程局部性(thread locality),减少线程上下文切换的开销。

LinkedBlockingQueue

LinkedBlockingQueue是一个由链表结构组成的有界队列,此队列的默认最大长度为Integer.MAX_VALUE(可将其看成无界队列)。在使用LinkedBlockingQueue时,也可以在初始化时指定其容量。 LinkedBlockingQueue队列按照先进先出(FIFO)的顺序进行排序。

因为实现了BlockingQueue接口,所以针对插入和移除操作,LinkedBlockingQueue同样提供了多种实现。与ArrayBlockingQueue一致,这里重点分析下put方法和take方法(阻塞的调用方式较常用)。

public void put(E e) throws InterruptedException {
    //  检查插入的元素是否为null,如果为null则抛出NullPointerException
    if (e == null) throw new NullPointerException();

    final int c; // 用于记录当前队列中的元素数量
    final Node<E> node = new Node<E>(e); // 创建一个新的节点,存储要插入的元素
    final ReentrantLock putLock = this.putLock; // 获取putLock,用于控制插入操作的锁
    final AtomicInteger count = this.count; // 获取count,记录队列中的元素数量

    //  获取putLock锁,允许在等待锁的过程中响应中断
    putLock.lockInterruptibly();
    try {
        //  如果队列已满(元素数量等于容量),则当前线程进入等待状态
        while (count.get() == capacity) {
            notFull.await(); // 等待notFull条件(队列未满)的信号
        }

        //  将新节点插入队列尾部
        enqueue(node);

        //  获取当前队列中的元素数量,并将其加1
        c = count.getAndIncrement();

        //  如果插入元素后队列仍未满,则唤醒其他可能正在等待插入的线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //  无论插入操作是否成功,最终都会释放putLock锁
        putLock.unlock();
    }

    //  如果插入元素前队列为空(c == 0),则唤醒可能正在等待取元素的线程
    if (c == 0)
        signalNotEmpty();
}

//  将新节点插入队列尾部的方法
private void enqueue(Node<E> node) {
    // 将新节点添加到链表尾部,并更新last指针
    last = last.next = node;
}

分析LinkedBlockingQueue的put方法源码可以发现,其处理流程与ArrayBlockingQueue的思路一致:线程在调用put方法向阻塞队列插入数据时,如果检测到阻塞队列已满,则会一直等待,直到队列非满或者响应中断退出。

public E take() throws InterruptedException {
    final E x; // 用于存储从队列中取出的元素
    final int c; // 用于记录当前队列中的元素数量
    final AtomicInteger count = this.count; // 获取队列的元素计数器
    final ReentrantLock takeLock = this.takeLock; // 获取takeLock,用于控制取操作的锁

    //  获取takeLock锁,允许在等待锁的过程中响应中断
    takeLock.lockInterruptibly();
    try {
        //  如果队列为空(元素数量为0),则当前线程进入等待状态
        while (count.get() == 0) {
            notEmpty.await(); // 等待notEmpty条件(队列不为空)的信号
        }

        //  从队列头部取出元素
        x = dequeue();

        //  获取当前队列中的元素数量,并将其减1
        c = count.getAndDecrement();

        //  如果取出元素后队列仍不为空,则唤醒其他可能正在等待取元素的线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        //  无论取操作是否成功,最终都会释放takeLock锁
        takeLock.unlock();
    }

    //  如果取出元素前队列已满(c == capacity),则唤醒可能正在等待插入的线程
    if (c == capacity)
        signalNotFull();

    //  返回取出的元素
    return x;
}

private E dequeue() {
    //  获取头节点(哨兵节点)和第一个实际存储元素的节点
    Node<E> h = head; // h 是哨兵节点
    Node<E> first = h.next; // first 是第一个实际存储元素的节点

    //  将哨兵节点的next指向自己,帮助GC回收
    h.next = h; // 将哨兵节点的next指向自己,断开与链表的连接

    //  更新head指针,使其指向新的头节点(first)
    head = first;

    //  获取第一个实际节点的元素值
    E x = first.item;

    //  将第一个实际节点的item置为null,帮助GC回收
    first.item = null;

    //  返回取出的元素
    return x;
}

线程在调用take方法从阻塞队列移除数据时,如果检测到阻塞队列已空,则会一直等待,直到队列非空或者响应中断退出。

0

评论区