当前位置: 首页 > 图灵资讯 > 技术篇> 【Java数据结构及算法实战】系列009:Java队列03——数组实现的阻塞队列ArrayBlockingQueue

【Java数据结构及算法实战】系列009:Java队列03——数组实现的阻塞队列ArrayBlockingQueue

来源:图灵教育
时间:2023-06-05 09:35:16

顾名思义,ArrayBlockingQueueue是一个基于数组的有界阻塞队列。该队列对元素进行了FIFO排序。队列的第一个元素是在队列中停留时间最长的元素。队列的尾部是在队列中停留时间最短的元素。新元素插入队列的尾部,队列检索操作获取队列头部的元素。

ArrayBlockingQueue是一个经典的“有界缓冲区”(bounded buffer)它包含一个固定大小的数组,用于携带包含生产者插入和消费者提取的元素。一旦创建了ArrayBlockingQueue的容量,就不能改变。试图将一个元素放入一个满队列会导致操作堵塞;试图从空队列中取出一个元素也会被堵塞。

ArrayBlockingQueueue支持排名的可选公平策略,用于等待生产者和消费者的线程。默认情况下,此顺序不保证。然而,由公平性设置为true结构的队列允许线程按FIFO顺序访问。公平性通常会减少吞吐量,但可以减少可变性,避免线程饥饿。

ArrayBlockingQueue类及其迭代器实现了Colection和Iterator接口的所有可选方法。ArayBlockingQueue是Java Collections Framework成员。

1.ArrayBlockingQueue的声明

ArrayBlockingQueue的接口和继承关系如下

public classArrayBlockingQueue<E> extends AbstractQueue<E>        implements BlockingQueue<E>, java.io.Serializable {    …}

完整的接口继承关系如下图所示。

【Java数据结构及算法实战】系列009:Java队列03——数组实现的阻塞队列ArrayBlockingQueue_ci

从上述代码可以看出,ArrayBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,继承java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,核心代码如下。

package java.util; public abstract class AbstractQueue<E>    extends AbstractCollection<E>    implements Queue<E> {     protected AbstractQueue() {    }     public boolean add(E e) {        if (offer(e))            return true;        else            throw new IllegalStateException("Queue full");    }     public E remove() {        E x = poll();        if (x != null)            return x;        else            throw new NoSuchElementException();    }     public E element() {        E x = peek();        if (x != null)            return x;        else            throw new NoSuchElementException();    }     public void clear() {        while (poll() != null)            ;    }     public boolean addAll(Collection<? extends E> c) {        if (c == null)            throw new NullPointerException();        if (c == this)            throw new IllegalArgumentException();        boolean modified = false;        for (E e : c)            if (add(e))modified = true;        return modified;    } }

2.ArrayBlockingQueue的成员变量和构造函数

以下是ArrayBlockingQueue的结构函数和成员变量。

// 元素数组    final Object[] items; // takeee消费索引、poll、操作peek或remove    int takeIndex; // 生产索引,用于put、操作offer或add操作    int putIndex; // 队列中的元素数    int count; /*     * 使用经典的双条件算法(two-condition algorithm)并发控制的实现     */ // 操作数组确保原子锁    finalReentrantLocklock; // 数组非空,唤醒消费者    private final Condition notEmpty; // 数组非满,唤醒生产者privateteet final Condition notFull; // 迭代器状态transienttenttent Itrs itrs; public ArrayBlockingQueue(int capacity) {        this(capacity, false);    }     public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();    }     public ArrayBlockingQueue(int capacity, boolean fair,                              Collection<? extends E> c) {        this(capacity, fair);         final ReentrantLock lock = this.lock;lock.lock(); // 只锁可见,不互斥        try {            final Object[] items = this.items;            int i = 0;            try {                for (E e : c)items[i++] = Objects.requireNonNull(e);            } catch (ArrayIndexOutOfBoundsException ex) {                throw new IllegalArgumentException();            }count = i;putIndex = (i == capacity) ? 0 : i;        } finally {lock.unlock();  // 解锁        }}

从上述代码可以看出,构造函数有三种。构造函数中的参数含义如下

lcapacity用于设置队列容量

lfair用于设置访问策略。如果是true,则在插入或删除线程时阻塞线程访问,则按FIFO顺序处理;如果是false,则未指定访问顺序

LC用于设置最初包含给定集合的元素,并按集合迭代器的遍历顺序添加

类成员items是用来存储队列中元素的数组。关键字final指出,当ArrayBlockingQueue结构完成后,通过newe Object[capacity]初始化items数组完成后,后续items的容量将不再改变。

通过Reentrantlock实现了访问策略。notempty通过两个加锁条件、实现并发控制的notFull。这是典型的双条件算法(two-condition algorithm)。

ArrayBlockingQueue生产增加putindex,消费增加takeindex。

Itrs用于记录当前活动迭代器的共享状态。如果已知没有迭代器,则为null。允许队列操作更新迭代器状态。迭代器状态不是本节的重点,不再深入讨论。

3.ArrayBlockingQueue的核心方法

以下解释了ArrayBlockingQueue常用核心方法的实现原理。

3.1.offer(e)

执行offer(e)方法后有两个结果

l队列未满时,返回true

当l队列满时,返回false

ArrayBlockingQueueoffer (e)方法源代码如下:

public booleanoffer(E e) {        Objects.requireNonNull(e);        final ReentrantLock lock = this.lock;lock.lock(); // 加锁        try {            if (count == items.length)                return false;            else {                enqueue(e); // 入队                return true;            }        } finally {lock.unlock();  // 解锁        }}

从上面的代码可以看出,执行offer(e)方法分为以下步骤:

l为保证并发操作的安全,先加锁。

然后判断count是否与数组items的长度一致。如果一致,则表明队列已满,直接返回false;否则,将执行enqueueue(e)方法做元素入队,返回true。

l终于解锁了。

enqueue(e)方法源代码如下:

private voidenqueue(E e) {        final Object[] items = this.items;items[putIndex] = e;        if (++putIndex == items.length) putIndex = 0;count++;notEmpty.signal(); // 唤醒等待中的线程}

以上代码相对简单,在当前索引中(putIndex)将待入队的元素放在位置,然后putindex和count分别增加,并通过signal()唤醒等待中的线程。其中一点是,当putindex等于几组items的长度时,putindex放置为0。

思考:当putindex等于数组items的长度时,为什么putindex是0?

3.2.put(e)

执行put(e)方法后有两个结果:

l队列未满时,直接插入未返回值

l队列满时,会堵塞等待,直到队列不满时再插入

ArrayBlockingQueueput (e)方法源代码如下:

public voidput(E e) throws InterruptedException {        Objects.requireNonNull(e);        final ReentrantLock lock = this.lock;lock.lockInterruptibly();  // 获取锁        try {            while (count == items.length)notFull.await();  // 使线程等待            enqueue(e);  // 入队        } finally {lock.unlock();  // 解锁        }    }

从上面的代码可以看出,put(e)该方法的实现分为以下步骤:

首先要获得锁。

然后判断count是否与数组items的长度一致。如果一致,则表明队列已满,则等待;否则,执行enqueue(e)做元素入队的方法。

l终于解锁了。

3.3.offer(e,time,unit)

offer(e,time,unit)方法和offer(e)不同的方法是前者增加了等待机制。设置等待时间。如果您不能在指定时间内将数据插入队列,请返回false。执行offer(e,time,unit)方法有两个结果:

l队列未满时,返回true

当l队列满时,它会阻止等待。如果数据不能在指定时间内插入队列,则返回falsee

ArrayBlockingQueueput (e)方法源代码如下:

public boolean offer(E e, long timeout, TimeUnit unit)        throws InterruptedException {         Objects.requireNonNull(e);        long nanos = unit.toNanos(timeout);        final ReentrantLock lock = this.lock;lock.lockInterruptibly();  // 获取锁        try {            while (count == items.length) {                if (nanos <= 0L)                    return false;nanos = notFull.awaitNanos(nanos);  // 让线程等待指定的时间            }            enqueue(e);            return true;        } finally {lock.unlock();  // 解锁        }    }

从上面的代码可以看出,offer(e,time,unit)该方法的实现分为以下步骤:

首先要获得锁。

然后判断count是否与数组items的长度一致。如果一致,则表明队列已满,则等待;否则,执行enqueue(e)做元素入队的方法。

l终于解锁了。

3.4.add(e)

执行add(e)方法后有两个结果

l队列未满时,返回true

l队列满时,抛出异常

ArrayBlockingQueueadd(e)方法源代码如下:

public booleanadd(E e) {        return super.add(e);    }

从上面的代码可以看出,add(e)该方法的实现直接调用了父类AbstractQue的addd(e)方法。AbstractQueaddd(e)方法源代码如下:

public booleanadd(E e) {        if (offer(e))            return true;        else            throw new IllegalStateException("Queue full");}

从上面的代码可以看出,add(e)该方法还调用了offer(e)方法。offer(e)这里就不赘述方法了。

3.5.poll ()

执行poll ()方法后有两个结果:

当l队列不是空的时候,返回队首值并移除

l队列空时返回nulll

ArrayBlockingQueuepoll()方法源代码如下:

publicpoll() {        final ReentrantLock lock = this.lock;lock.lock();  // 加锁        try {            return (count == 0) ? null : dequeue(); // 出队        } finally {lock.unlock();  // 解锁        }    }

从以上代码可以看出,poll()方法的执行分为以下步骤:

l为保证并发操作的安全,先加锁。

然后判断count是否等于0,如果等于0,则证明队列为空,直接返回nulll;否则,执行dequeue()方法作为元素出队。

l终于解锁了。

dequeue()方法源代码如下:

privatedequeue() {        final Object[] items = this.items;@SuppressWarnings("unchecked")        E e = (E) items[takeIndex];items[takeIndex] = null;  // 删除数据        if (++takeIndex == items.length) takeIndex = 0;count--;        if (itrs != null)itrs.elementDequeued();notFull.signal(); // 唤醒等待中的线程        return e;}

以上代码相对简单,在当前索引中(takeIndex)取出要出队的元素,删除队列中的元素,然后takeindex增加count递减,通过signal()唤醒等待中的线程。其中一点是,当takeindex等于几组items的长度时,takeindex位于0。

3.6.take()

执行take()方法后有两个结果:

当l队列不是空的时候,返回队首值并移除

l队列空的时候,会阻止等待,等到队列不空的时候再回队首值。

ArrayBlockingQueuetake ()方法源代码如下:

publictake() throws InterruptedException {        final ReentrantLock lock = this.lock;lock.lockInterruptibly();  // 获取锁        try {            while (count == 0)notEmpty.await(); // 使线程等待            return dequeue();  // 出队        } finally {lock.unlock();  // 解锁        }    }

从上述代码可以看出,执行take()方法时,分为以下步骤:

首先要获得锁。

然后判断count是否是否则等于0,如果等于0则证明队列为空,则会阻止等待;否则,执行dequeue()方法作为元素出队。

l终于解锁了。

dequeue()这里不再重复方法。

3.7.poll(time,unit)

poll(time,unit)该方法不同于poll()方法,前者增加了等待机制。设置等待时间,如果队列在指定时间内仍然空,则返回null。执行poll(time,unit)方法后有两个结果:

当l队列不是空的时候,返回队首值并移除

l队列空时,会阻止等待。如果队列在指定时间内仍然空,则返回nulll

ArrayBlockingQueuepoll(time,unit)方法源代码如下:

publicpoll(long timeout, TimeUnit unit) throws InterruptedException {        long nanos = unit.toNanos(timeout);        final ReentrantLock lock = this.lock;lock.lockInterruptibly();  // 获取锁        try {            while (count == 0) {                if (nanos <= 0L)                    return null;nanos = notEmpty.awaitNanos(nanos); // 让线程等待指定的时间            }            return dequeue();  // 出队        } finally {lock.unlock();  // 解锁        }    }

从上面的代码可以看出,执行polll(time,unit)方法分为以下步骤:

首先要获得锁。

然后判断count是否等于0,如果等于0,则证明队列为空,将阻止等待;否则,执行dequeue()方法作为元素出队。

l终于解锁了。

dequeue()这里不再重复方法。

3.8.remove()

实施remove()方法后有两个结果:

当l队列不是空的时候,返回队首值并移除

L队列为空时,抛出异常

ArrayBlockingQueueremove()方法实际上是调用父类AbstractQueremove()方法,源码如下:

publicremove() {        E x = poll();        if (x != null)            return x;        else            throw new NoSuchElementException();}

从上面的代码可以看出,remove()直接调用poll()方法。如果poll()方法返回结果为null,则抛出nosuchelementexception异常。

poll()这里不再重复方法。

3.9.peek()

实施peek()方法后有两个结果:

当l队列不是空的时候,返回队的首值不会被移除

l队列空时返回nulll

peek()方法源代码如下:

publicpeek() {        final ReentrantLock lock = this.lock;lock.lock(); // 加锁        try {            return itemAt(takeIndex); // 空则返回null        } finally {lock.unlock();  // 解锁        }} finalitemAt(int i) {        return (E) items[i];}

从上面的代码可以看出,peek()方法比较简单,直接获得了几组中的索引为takeIndex的元素。

3.10.element()

执行element()方法后有两个结果:

当l队列不是空的时候,返回队的首值不会被移除

L队列为空时,抛出异常

element()方法实际上是调用父类AbstractQueelement()方法,源码如下:

publicelement() {        E x = peek();        if (x != null)            return x;        else            throw new NoSuchElementException();}

从上面的代码可以看出,在执行element()方法时,首先要获得peek()方法的结果。如果结果是null,则抛出nosuchelentexception异常。

4.ArrrayBlockingQueue单元测试

ArrayBlockingQueue的单元测试如下:

package com.waylau.java.demo.datastructure; import static org.junit.jupiter.api.Assertions.assertEquals;import static org.junit.jupiter.api.Assertions.assertFalse;import static org.junit.jupiter.api.Assertions.assertNotNull;import static org.junit.jupiter.api.Assertions.assertNull;import static org.junit.jupiter.api.Assertions.assertThrows;import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.NoSuchElementException;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; /** * ArrayBlockingQueue Tests * * @since 1.0.0 2020年5月3日 * @author <a href="https://waylau.com">Way Lau</a> */class ArrayBlockingQueueTests {@Test    void testOffer() {// 初始化队列        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 测试队列未满时,返回 true        boolean resultNotFull = queue.offer("Java");        assertTrue(resultNotFull); // 测试队列满则,返回 falsequeue.offer("C");queue.offer("Python");        boolean resultFull = queue.offer("C++");        assertFalse(resultFull);    } @Test    void testPut() throws InterruptedException {// 初始化队列        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 测试队列未满时,无返回值直接插入;queue.put("Java"); // 测试队列满则, 会阻止等待,直到队列不满。queue.put("C");queue.put("Python");queue.put("C++");  // 阻塞等待    } @Test    void testOfferTime() throws InterruptedException {// 初始化队列        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 测试队列未满时,返回 true        boolean resultNotFull = queue.offer("Java", 5, TimeUnit.SECONDS);        assertTrue(resultNotFull); // 测试队列满则,返回 falsequeue.offer("C");queue.offer("Python");        boolean resultFull = queue.offer("C++", 5, TimeUnit.SECONDS); // 等5秒        assertFalse(resultFull);    } @Test    void testAdd() {// 初始化队列        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 测试队列未满时,返回 true        boolean resultNotFull = queue.add("Java");        assertTrue(resultNotFull); // 测试队列满后,抛出异常queueee.add("C");queue.add("Python");         Throwable excpetion = assertThrows(IllegalStateException.class, () -> {queue.add("C++");// 抛异常        });         assertEquals("Queue full", excpetion.getMessage());    } @Test    void testPoll() throws InterruptedException {// 初始化队列        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列为空时,返回 null        String resultEmpty = queue.poll();        assertNull(resultEmpty); // 当测试队列不空时,返回队首值,移除queueee.put("Java");queue.put("C");queue.put("Python");        String resultNotEmpty = queue.poll();        assertEquals("Java", resultNotEmpty);    } @Test    void testTake() throws InterruptedException {// 初始化队列        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列不空时,返回队首值,移除queueee.put("Java");queue.put("C");queue.put("Python");        String resultNotEmpty = queue.take();        assertEquals("Java", resultNotEmpty); // 当测试队列为空时,会堵塞等待,一直等到队列不空的时候再回队首值queue.clear();        String resultEmpty = queue.take(); // 阻塞等待        assertNotNull(resultEmpty);    } @Test    void testPollTime() throws InterruptedException {// 初始化队列        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列不空时,返回队首值,移除queueee.put("Java");queue.put("C");queue.put("Python");        String resultNotEmpty = queue.poll(5, TimeUnit.SECONDS);        assertEquals("Java", resultNotEmpty); // 当测试队列为空时,会堵塞等待,如果队列在指定时间内仍然是空的,则返回 nullqueue.clear();        String resultEmpty = queue.poll(5, TimeUnit.SECONDS); // 等待5秒        assertNull(resultEmpty);    } @Test    void testRemove() throws InterruptedException {// 初始化队列        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列为空时,抛出异常        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {queue.remove();// 抛异常        });         assertEquals(null, excpetion.getMessage()); // 当测试队列不空时,返回队首值,移除queueee.put("Java");queue.put("C");queue.put("Python");        String resultNotEmpty = queue.remove();        assertEquals("Java", resultNotEmpty);    } @Test    void testPeek() throws InterruptedException {// 初始化队列        Queue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列不空时,返回队伍首值,但不移除queueee.add("Java");queue.add("C");queue.add("Python");        String resultNotEmpty = queue.peek();        assertEquals("Java", resultNotEmpty);resultNotEmpty = queue.peek();        assertEquals("Java", resultNotEmpty);resultNotEmpty = queue.peek();        assertEquals("Java", resultNotEmpty); // 当测试队列为空时,回到nulqueeue.clear();        String resultEmpty = queue.peek();        assertNull(resultEmpty);    } @Test    void testElement() throws InterruptedException {// 初始化队列        Queue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列不空时,返回队伍首值,但不移除queueee.add("Java");queue.add("C");queue.add("Python");        String resultNotEmpty = queue.element();        assertEquals("Java", resultNotEmpty);resultNotEmpty = queue.element();        assertEquals("Java", resultNotEmpty);resultNotEmpty = queue.element();        assertEquals("Java", resultNotEmpty); // 当测试队列为空时,抛出异常queue.clear();        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {queue.element();// 抛异常        });         assertEquals(null, excpetion.getMessage());    }}

5.ArrayBlockingQueue的应用案例

以下是生产者-消费者的例子。这个例子模拟了一个生产者和两个消费者。当队列满时,会阻止生产者生产;当队列空时,会阻碍消费者消费。

package com.waylau.java.demo.datastructure; import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue; /** * ArrayBlockingQueue Demo * * @since 1.0.0 2020年5月3日 * @author <a href="https://waylau.com">Way Lau</a> */public class ArrayBlockingQueueDemo {     public static void main(String[] args) {        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);       // 1个生产者        Producer p = new Producer(queue);       // 2个消费者        Consumer c1 = new Consumer("c1", queue);        Consumer c2 = new Consumer("c2", queue);       // 启动线程        new Thread(p).start();        new Thread(c1).start();        new Thread(c2).start();    }} class Producer implements Runnable {    private final BlockingQueue<String> queue;     Producer(BlockingQueue<String> queue) {        this.queue = queue;    }     public void run() {        try {            while (true) {// 模拟耗时操作                Thread.sleep(1000L); queue.put(produce());            }        } catch (InterruptedException ex) {ex.printStackTrace();        }    }     String produce() {        String apple = "apple: " + System.currentTimeMillis();        System.out.println("produce " + apple);        return apple;    }} class Consumer implements Runnable {    private final BlockingQueue<String> queue;     private final String name;     Consumer(String name, BlockingQueue<String> queue) {        this.queue = queue;        this.name = name;    }     public void run() {        try {            while (true) {// 模拟耗时操作                Thread.sleep(2000L);                 consume(queue.take());            }        } catch (InterruptedException ex) {ex.printStackTrace();        }    }     void consume(Object x) {        System.out.println(this.name + " consume " + x);    }}

操作上述程序,输出内容如下:

produce apple: 1590308383034c2 consume apple: 1590308334 apple: 1590308384034c1 consume apple: 1590308344. apple: 1590308385036c2 consume apple: 159030838536 apple: 1590308386036c1 consume apple: 159030836036 apple: 1590308387036c2 consume apple: 159030837036 apple: 1590308388036c1 consume apple: 15903088836 apple: 1590308389041c2 consume apple: 15903088904 apple: 1590308390041c1 consume apple: 15903089004 apple: 1590308391042c2 consume apple: 15903089104 apple: 1590308392042c1 consume apple: 1590308392042

6.参考引用

本系列归档为Java数据结构和算法实战:https://github.com/waylau/java-data-structures-and-algorithms-in-action

上一篇:

Java位运算符的学习

下一篇:

2.进程与线程