顾名思义,ArrayBlockingQueueue是一个基于数组的有界阻塞队列。该队列对元素进行了FIFO排序。队列的第一个元素是在队列中停留时间最长的元素。队列的尾部是在队列中停留时间最短的元素。新元素插入队列的尾部,队列检索操作获取队列头部的元素。
ArrayBlockingQueue是一个经典的“有界缓冲区”(bounded buffer)它包含一个固定大小的数组,用于携带包含生产者插入和消费者提取的元素。一旦创建了ArrayBlockingQueue的容量,就不能改变。试图将一个元素放入一个满队列会导致操作堵塞;试图从空队列中取出一个元素也会被堵塞。
ArrayBlockingQueueue支持排名的可选公平策略,用于等待生产者和消费者的线程。默认情况下,此顺序不保证。然而,由公平性设置为true结构的队列允许线程按FIFO顺序访问。公平性通常会减少吞吐量,但可以减少可变性,避免线程饥饿。
ArrayBlockingQueue类及其迭代器实现了Colection和Iterator接口的所有可选方法。ArayBlockingQueue是Java Collections Framework成员。
1.ArrayBlockingQueue的声明ArrayBlockingQueue的接口和继承关系如下
public classArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { …}
完整的接口继承关系如下图所示。
从上述代码可以看出,ArrayBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,继承java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,核心代码如下。
package java.util; public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { protected AbstractQueue() { } public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } public void clear() { while (poll() != null) ; } public boolean addAll(Collection<? extends E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); boolean modified = false; for (E e : c) if (add(e))modified = true; return modified; } }
2.ArrayBlockingQueue的成员变量和构造函数以下是ArrayBlockingQueue的结构函数和成员变量。
// 元素数组 final Object[] items; // takeee消费索引、poll、操作peek或remove int takeIndex; // 生产索引,用于put、操作offer或add操作 int putIndex; // 队列中的元素数 int count; /* * 使用经典的双条件算法(two-condition algorithm)并发控制的实现 */ // 操作数组确保原子锁 finalReentrantLocklock; // 数组非空,唤醒消费者 private final Condition notEmpty; // 数组非满,唤醒生产者privateteet final Condition notFull; // 迭代器状态transienttenttent Itrs itrs; public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock;lock.lock(); // 只锁可见,不互斥 try { final Object[] items = this.items; int i = 0; try { for (E e : c)items[i++] = Objects.requireNonNull(e); } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); }count = i;putIndex = (i == capacity) ? 0 : i; } finally {lock.unlock(); // 解锁 }}
从上述代码可以看出,构造函数有三种。构造函数中的参数含义如下
lcapacity用于设置队列容量
lfair用于设置访问策略。如果是true,则在插入或删除线程时阻塞线程访问,则按FIFO顺序处理;如果是false,则未指定访问顺序
LC用于设置最初包含给定集合的元素,并按集合迭代器的遍历顺序添加
类成员items是用来存储队列中元素的数组。关键字final指出,当ArrayBlockingQueue结构完成后,通过newe Object[capacity]初始化items数组完成后,后续items的容量将不再改变。
通过Reentrantlock实现了访问策略。notempty通过两个加锁条件、实现并发控制的notFull。这是典型的双条件算法(two-condition algorithm)。
ArrayBlockingQueue生产增加putindex,消费增加takeindex。
Itrs用于记录当前活动迭代器的共享状态。如果已知没有迭代器,则为null。允许队列操作更新迭代器状态。迭代器状态不是本节的重点,不再深入讨论。
3.ArrayBlockingQueue的核心方法以下解释了ArrayBlockingQueue常用核心方法的实现原理。
3.1.offer(e)执行offer(e)方法后有两个结果
l队列未满时,返回true
当l队列满时,返回false
ArrayBlockingQueueoffer (e)方法源代码如下:
public booleanoffer(E e) { Objects.requireNonNull(e); final ReentrantLock lock = this.lock;lock.lock(); // 加锁 try { if (count == items.length) return false; else { enqueue(e); // 入队 return true; } } finally {lock.unlock(); // 解锁 }}
从上面的代码可以看出,执行offer(e)方法分为以下步骤:
l为保证并发操作的安全,先加锁。
然后判断count是否与数组items的长度一致。如果一致,则表明队列已满,直接返回false;否则,将执行enqueueue(e)方法做元素入队,返回true。
l终于解锁了。
enqueue(e)方法源代码如下:
private voidenqueue(E e) { final Object[] items = this.items;items[putIndex] = e; if (++putIndex == items.length) putIndex = 0;count++;notEmpty.signal(); // 唤醒等待中的线程}
以上代码相对简单,在当前索引中(putIndex)将待入队的元素放在位置,然后putindex和count分别增加,并通过signal()唤醒等待中的线程。其中一点是,当putindex等于几组items的长度时,putindex放置为0。
思考:当putindex等于数组items的长度时,为什么putindex是0?
3.2.put(e)执行put(e)方法后有两个结果:
•
l队列未满时,直接插入未返回值
l队列满时,会堵塞等待,直到队列不满时再插入
ArrayBlockingQueueput (e)方法源代码如下:
public voidput(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 获取锁 try { while (count == items.length)notFull.await(); // 使线程等待 enqueue(e); // 入队 } finally {lock.unlock(); // 解锁 } }
从上面的代码可以看出,put(e)该方法的实现分为以下步骤:
首先要获得锁。
然后判断count是否与数组items的长度一致。如果一致,则表明队列已满,则等待;否则,执行enqueue(e)做元素入队的方法。
l终于解锁了。
3.3.offer(e,time,unit)offer(e,time,unit)方法和offer(e)不同的方法是前者增加了等待机制。设置等待时间。如果您不能在指定时间内将数据插入队列,请返回false。执行offer(e,time,unit)方法有两个结果:
•
l队列未满时,返回true
当l队列满时,它会阻止等待。如果数据不能在指定时间内插入队列,则返回falsee
ArrayBlockingQueueput (e)方法源代码如下:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { Objects.requireNonNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 获取锁 try { while (count == items.length) { if (nanos <= 0L) return false;nanos = notFull.awaitNanos(nanos); // 让线程等待指定的时间 } enqueue(e); return true; } finally {lock.unlock(); // 解锁 } }
从上面的代码可以看出,offer(e,time,unit)该方法的实现分为以下步骤:
首先要获得锁。
然后判断count是否与数组items的长度一致。如果一致,则表明队列已满,则等待;否则,执行enqueue(e)做元素入队的方法。
l终于解锁了。
3.4.add(e)执行add(e)方法后有两个结果
l队列未满时,返回true
l队列满时,抛出异常
ArrayBlockingQueueadd(e)方法源代码如下:
public booleanadd(E e) { return super.add(e); }
从上面的代码可以看出,add(e)该方法的实现直接调用了父类AbstractQue的addd(e)方法。AbstractQueaddd(e)方法源代码如下:
public booleanadd(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full");}
从上面的代码可以看出,add(e)该方法还调用了offer(e)方法。offer(e)这里就不赘述方法了。
3.5.poll ()执行poll ()方法后有两个结果:
当l队列不是空的时候,返回队首值并移除
l队列空时返回nulll
ArrayBlockingQueuepoll()方法源代码如下:
publicpoll() { final ReentrantLock lock = this.lock;lock.lock(); // 加锁 try { return (count == 0) ? null : dequeue(); // 出队 } finally {lock.unlock(); // 解锁 } }
从以上代码可以看出,poll()方法的执行分为以下步骤:
l为保证并发操作的安全,先加锁。
然后判断count是否等于0,如果等于0,则证明队列为空,直接返回nulll;否则,执行dequeue()方法作为元素出队。
l终于解锁了。
dequeue()方法源代码如下:
privatedequeue() { final Object[] items = this.items;@SuppressWarnings("unchecked") E e = (E) items[takeIndex];items[takeIndex] = null; // 删除数据 if (++takeIndex == items.length) takeIndex = 0;count--; if (itrs != null)itrs.elementDequeued();notFull.signal(); // 唤醒等待中的线程 return e;}
以上代码相对简单,在当前索引中(takeIndex)取出要出队的元素,删除队列中的元素,然后takeindex增加count递减,通过signal()唤醒等待中的线程。其中一点是,当takeindex等于几组items的长度时,takeindex位于0。
3.6.take()执行take()方法后有两个结果:
当l队列不是空的时候,返回队首值并移除
l队列空的时候,会阻止等待,等到队列不空的时候再回队首值。
ArrayBlockingQueuetake ()方法源代码如下:
publictake() throws InterruptedException { final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 获取锁 try { while (count == 0)notEmpty.await(); // 使线程等待 return dequeue(); // 出队 } finally {lock.unlock(); // 解锁 } }
从上述代码可以看出,执行take()方法时,分为以下步骤:
首先要获得锁。
然后判断count是否是否则等于0,如果等于0则证明队列为空,则会阻止等待;否则,执行dequeue()方法作为元素出队。
l终于解锁了。
dequeue()这里不再重复方法。
3.7.poll(time,unit)poll(time,unit)该方法不同于poll()方法,前者增加了等待机制。设置等待时间,如果队列在指定时间内仍然空,则返回null。执行poll(time,unit)方法后有两个结果:
当l队列不是空的时候,返回队首值并移除
l队列空时,会阻止等待。如果队列在指定时间内仍然空,则返回nulll
ArrayBlockingQueuepoll(time,unit)方法源代码如下:
publicpoll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 获取锁 try { while (count == 0) { if (nanos <= 0L) return null;nanos = notEmpty.awaitNanos(nanos); // 让线程等待指定的时间 } return dequeue(); // 出队 } finally {lock.unlock(); // 解锁 } }
从上面的代码可以看出,执行polll(time,unit)方法分为以下步骤:
首先要获得锁。
然后判断count是否等于0,如果等于0,则证明队列为空,将阻止等待;否则,执行dequeue()方法作为元素出队。
l终于解锁了。
dequeue()这里不再重复方法。
3.8.remove()实施remove()方法后有两个结果:
当l队列不是空的时候,返回队首值并移除
L队列为空时,抛出异常
ArrayBlockingQueueremove()方法实际上是调用父类AbstractQueremove()方法,源码如下:
publicremove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException();}
从上面的代码可以看出,remove()直接调用poll()方法。如果poll()方法返回结果为null,则抛出nosuchelementexception异常。
poll()这里不再重复方法。
3.9.peek()实施peek()方法后有两个结果:
当l队列不是空的时候,返回队的首值不会被移除
l队列空时返回nulll
peek()方法源代码如下:
publicpeek() { final ReentrantLock lock = this.lock;lock.lock(); // 加锁 try { return itemAt(takeIndex); // 空则返回null } finally {lock.unlock(); // 解锁 }} finalitemAt(int i) { return (E) items[i];}
从上面的代码可以看出,peek()方法比较简单,直接获得了几组中的索引为takeIndex的元素。
3.10.element()执行element()方法后有两个结果:
当l队列不是空的时候,返回队的首值不会被移除
L队列为空时,抛出异常
element()方法实际上是调用父类AbstractQueelement()方法,源码如下:
publicelement() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException();}
从上面的代码可以看出,在执行element()方法时,首先要获得peek()方法的结果。如果结果是null,则抛出nosuchelentexception异常。
4.ArrrayBlockingQueue单元测试ArrayBlockingQueue的单元测试如下:
package com.waylau.java.demo.datastructure; import static org.junit.jupiter.api.Assertions.assertEquals;import static org.junit.jupiter.api.Assertions.assertFalse;import static org.junit.jupiter.api.Assertions.assertNotNull;import static org.junit.jupiter.api.Assertions.assertNull;import static org.junit.jupiter.api.Assertions.assertThrows;import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.NoSuchElementException;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; /** * ArrayBlockingQueue Tests * * @since 1.0.0 2020年5月3日 * @author <a href="https://waylau.com">Way Lau</a> */class ArrayBlockingQueueTests {@Test void testOffer() {// 初始化队列 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 测试队列未满时,返回 true boolean resultNotFull = queue.offer("Java"); assertTrue(resultNotFull); // 测试队列满则,返回 falsequeue.offer("C");queue.offer("Python"); boolean resultFull = queue.offer("C++"); assertFalse(resultFull); } @Test void testPut() throws InterruptedException {// 初始化队列 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 测试队列未满时,无返回值直接插入;queue.put("Java"); // 测试队列满则, 会阻止等待,直到队列不满。queue.put("C");queue.put("Python");queue.put("C++"); // 阻塞等待 } @Test void testOfferTime() throws InterruptedException {// 初始化队列 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 测试队列未满时,返回 true boolean resultNotFull = queue.offer("Java", 5, TimeUnit.SECONDS); assertTrue(resultNotFull); // 测试队列满则,返回 falsequeue.offer("C");queue.offer("Python"); boolean resultFull = queue.offer("C++", 5, TimeUnit.SECONDS); // 等5秒 assertFalse(resultFull); } @Test void testAdd() {// 初始化队列 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 测试队列未满时,返回 true boolean resultNotFull = queue.add("Java"); assertTrue(resultNotFull); // 测试队列满后,抛出异常queueee.add("C");queue.add("Python"); Throwable excpetion = assertThrows(IllegalStateException.class, () -> {queue.add("C++");// 抛异常 }); assertEquals("Queue full", excpetion.getMessage()); } @Test void testPoll() throws InterruptedException {// 初始化队列 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列为空时,返回 null String resultEmpty = queue.poll(); assertNull(resultEmpty); // 当测试队列不空时,返回队首值,移除queueee.put("Java");queue.put("C");queue.put("Python"); String resultNotEmpty = queue.poll(); assertEquals("Java", resultNotEmpty); } @Test void testTake() throws InterruptedException {// 初始化队列 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列不空时,返回队首值,移除queueee.put("Java");queue.put("C");queue.put("Python"); String resultNotEmpty = queue.take(); assertEquals("Java", resultNotEmpty); // 当测试队列为空时,会堵塞等待,一直等到队列不空的时候再回队首值queue.clear(); String resultEmpty = queue.take(); // 阻塞等待 assertNotNull(resultEmpty); } @Test void testPollTime() throws InterruptedException {// 初始化队列 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列不空时,返回队首值,移除queueee.put("Java");queue.put("C");queue.put("Python"); String resultNotEmpty = queue.poll(5, TimeUnit.SECONDS); assertEquals("Java", resultNotEmpty); // 当测试队列为空时,会堵塞等待,如果队列在指定时间内仍然是空的,则返回 nullqueue.clear(); String resultEmpty = queue.poll(5, TimeUnit.SECONDS); // 等待5秒 assertNull(resultEmpty); } @Test void testRemove() throws InterruptedException {// 初始化队列 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列为空时,抛出异常 Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {queue.remove();// 抛异常 }); assertEquals(null, excpetion.getMessage()); // 当测试队列不空时,返回队首值,移除queueee.put("Java");queue.put("C");queue.put("Python"); String resultNotEmpty = queue.remove(); assertEquals("Java", resultNotEmpty); } @Test void testPeek() throws InterruptedException {// 初始化队列 Queue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列不空时,返回队伍首值,但不移除queueee.add("Java");queue.add("C");queue.add("Python"); String resultNotEmpty = queue.peek(); assertEquals("Java", resultNotEmpty);resultNotEmpty = queue.peek(); assertEquals("Java", resultNotEmpty);resultNotEmpty = queue.peek(); assertEquals("Java", resultNotEmpty); // 当测试队列为空时,回到nulqueeue.clear(); String resultEmpty = queue.peek(); assertNull(resultEmpty); } @Test void testElement() throws InterruptedException {// 初始化队列 Queue<String> queue = new ArrayBlockingQueue<String>(3); // 当测试队列不空时,返回队伍首值,但不移除queueee.add("Java");queue.add("C");queue.add("Python"); String resultNotEmpty = queue.element(); assertEquals("Java", resultNotEmpty);resultNotEmpty = queue.element(); assertEquals("Java", resultNotEmpty);resultNotEmpty = queue.element(); assertEquals("Java", resultNotEmpty); // 当测试队列为空时,抛出异常queue.clear(); Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {queue.element();// 抛异常 }); assertEquals(null, excpetion.getMessage()); }}
5.ArrayBlockingQueue的应用案例以下是生产者-消费者的例子。这个例子模拟了一个生产者和两个消费者。当队列满时,会阻止生产者生产;当队列空时,会阻碍消费者消费。
package com.waylau.java.demo.datastructure; import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue; /** * ArrayBlockingQueue Demo * * @since 1.0.0 2020年5月3日 * @author <a href="https://waylau.com">Way Lau</a> */public class ArrayBlockingQueueDemo { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); // 1个生产者 Producer p = new Producer(queue); // 2个消费者 Consumer c1 = new Consumer("c1", queue); Consumer c2 = new Consumer("c2", queue); // 启动线程 new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); }} class Producer implements Runnable { private final BlockingQueue<String> queue; Producer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { try { while (true) {// 模拟耗时操作 Thread.sleep(1000L); queue.put(produce()); } } catch (InterruptedException ex) {ex.printStackTrace(); } } String produce() { String apple = "apple: " + System.currentTimeMillis(); System.out.println("produce " + apple); return apple; }} class Consumer implements Runnable { private final BlockingQueue<String> queue; private final String name; Consumer(String name, BlockingQueue<String> queue) { this.queue = queue; this.name = name; } public void run() { try { while (true) {// 模拟耗时操作 Thread.sleep(2000L); consume(queue.take()); } } catch (InterruptedException ex) {ex.printStackTrace(); } } void consume(Object x) { System.out.println(this.name + " consume " + x); }}
操作上述程序,输出内容如下:
produce apple: 1590308383034c2 consume apple: 1590308334 apple: 1590308384034c1 consume apple: 1590308344. apple: 1590308385036c2 consume apple: 159030838536 apple: 1590308386036c1 consume apple: 159030836036 apple: 1590308387036c2 consume apple: 159030837036 apple: 1590308388036c1 consume apple: 15903088836 apple: 1590308389041c2 consume apple: 15903088904 apple: 1590308390041c1 consume apple: 15903089004 apple: 1590308391042c2 consume apple: 15903089104 apple: 1590308392042c1 consume apple: 1590308392042
6.参考引用本系列归档为Java数据结构和算法实战:https://github.com/waylau/java-data-structures-and-algorithms-in-action