今天,让我们学习AQS家族的“外门弟子”:CyclicBarrier。
为何说CyclicBarrier是AQS家族的“外门弟子”?那是因为cyclicbarier本身和内部类Generation并没有继承AQS,而是在实现源代码时深深依赖AQS家族的成员Reentrantlock。就像修仙小说一样,大家庭会区分外门和内门,外门弟子通常会借助内门弟子的名声行事。Cyclicbarrier就是这样,所以被认为是AQS家族的“外门弟子”。在实际面试中,Cyclicbarier出现的次数较少,通常与Countdownlatch进行比较。
今天,我们将逐步拆解cyclicbarier,看看它和countdownlatch有什么区别。
什么是Cyclicbarier?首先从Cyclicbarier的名字开始。Cyclic是一个形容词,翻译成“循环、循环”。barier是一个名词,翻译成“屏障、栅栏”,组合成“循环屏障”,那么如何理解“循环屏障”呢?让我们来看看Cyclicbarier的注释是如何解释的:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.Cyclicbarier是同步辅助工具,允许一组线程等待彼此到达共同的屏障点。
The barrier is called cyclic because it can be re-used after the waiting threads are released.屏障被称为循环屏障,因为它可以在等待线程释放后重复使用。
看起来和CountDownlatch有些相似,我们通过一张图片来展示Cyclicbarier是如何工作的:
当部分线程到达屏障时,它们将在屏障处等待。只有当所有线程到达屏障时,它们才会继续执行。如果以CountDownlatch中的越野徒步旅行为例,拿走老板,等待球员之间的是Cyclicbarier。
另外,注释说Cyclicbarier是“re-used“,可以重复使用。回顾CountDownlatch的实现,没有做任何重置计数器的工作,即当CountDownlatch的计数减少到0后无法恢复,也就是说,CountDownlatch的功能是一次性的。
Tips:事实上,CountDownlatch可以实现类似Cyclicbarier的功能。
如何使用Cyclicbarier?我们以没有老板参加的越野徒步为例。一些先到的球员必须等待后到的球员一起吃午饭。Cyclicbarrier实现的代码是这样的:
// Cyclicbariercyclicbaririer cyclicBarrier = new CyclicBarrier(10);for (int i = 0; i < 10; i++) { int finalI = i; new Thread(() -> { try { TimeUnit.SECONDS.sleep((finalI + 1)); } catch (InterruptedException e) { throw new RuntimeException(e); } try { System.out.println(“选手[””” + finalI + "]到达终点,等待其他选手!!!!"); // 在屏障点等待线程 cyclicBarrier.await(); System.out.println(“选手[””” + finalI + "开始吃午饭!!!"); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } }).start();}
用法与Countdownlatch非常相似。Cyclicbarier需要多少线程才能达到屏障,区别在于Cyclicbarier在每个线程中都被调用CyclicBarrier#await
,当我们使用CountDownlatch时,我们只在主线程中调用了一次CountDownLatch#await
。
CountDownlatch可以在线程中调用CountDownLatch#await
答案是肯定的,这样使用的效果和Cyclicbarier一样:
CountDownLatch countDownLatch = new CountDownLatch(10);for (int i = 0; i < 10; i++) { int finalI = i; new Thread(() -> { try { TimeUnit.SECONDS.sleep((finalI + 1)); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(“选手[””” + finalI + 到达终点!!!!"); countDownLatch.countDown(); try { countDownLatch.await(); System.out.println(“选手[””” + finalI + "开始吃午饭!!!"); } catch (InterruptedException e) { throw new RuntimeException(e); } }).start();}
通过以上例子,不难想到CyclicBarrier#await
该方法同时具备CountDownLatch#countDown
方法和CountDownLatch#await
该方法的能力,即计数减1和暂停线程。
让我们对Cyclicbarierier有一个整体的了解:
Cyclicbarier的内部结构比CountDownlatch比较复杂,除了我们前面提到的借助AQS的“内门弟子”Reentrantlock类型。lock
与Condition类型相比trip
此外,Cyclicbarier还有两个“特别”的地方:
- 内部类Generation,直译为“代”,它起什么作用?
- Runnnable成员变量
barrierCommand
,它又做了什么?
其余部分,大部分都可以在CountDownlatch中找到相应的方法,或者我们可以通过名称轻松了解它们的功能。
Cyclicbarier的构造方法Cyclicbarier提供了两种(实际上是一种)结构方法:
// privatete需要到达屏障的线程数 final int parties;// 所有线程到达后执行的动作privatete final Runnable barrierCommand;// private计数器 int count;public CyclicBarrier(int parties) { this(parties, null);}public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) { throw new IllegalArgumentException(); } this.parties = parties; this.count = parties; this.barrierCommand = barrierAction;}
第二个结构函数接收了两个参数:
parties
:需要多少线程才能到达屏障调用CyclicBarrier#await
;barrierAction
:所有线程到达屏障后执行的动作。
构造方法的代码一如既往的简单,只有一个地方容易产生疑惑,parties
和count
有什么区别?
首先,我们来看看成员变量的声明,parties
使用了final
,这表明它是一个不可变的对象,代表CyclicBarier需要几个线程才能到达屏障;count
是计数器,初始值是parties
,随着到达屏障的线程数量的增加count
会逐渐减少到0。
private static class Generation { Generation() {} boolean broken;}
Generation用于标记Cyclicbarier的现代,Doug Lea是这样解释它的作用的:
Each use of the barrier is represented as a generation instance. The generation changes whenever the barrier is tripped, or is reset.每次使用屏障(CyclicBarrier)都需要一个Generation的例子。Generation无论是通过屏障还是重置屏障都会发生变化。
在Generation中broken
用来标记当前的Cyclicbarier是否被打破,默认为false,值为true时,说明Cyclicbarier已经被打破,此时Cyclicbarier不能正常使用,需要调用CyclicBarrier#reset
方法重置CyclicBarier的状态。
我们在前面猜测CyclicBarrier#await
该方法实现了计数减1和线程等待功能。让我们通过源代码验证我们的想法:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); }}public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout));}
两种重载方法都指向CyclicBarrier#dowait
方法:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 使用Reentrantlock final ReentrantLock lock = this.lock; lock.lock(); try { // 第2部分 // 获得Cyclicbarier的现代,检查Cyclicbarier是否被打破 final Generation g = generation; if (g.broken) { throw new BrokenBarrierException(); } // 当线程中断时,调用breakbarier方法 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 第3部分 ///计数器减1 int index = --count; // 当计数器为0时,意味着所有的线程都到了,此时要做的就是唤醒等待中的线程 if (index == 0) { boolean ranAction = false; try { // 唤醒前执行操作 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; // Cyclicbarier进入下一代 nextGeneration(); return 0; } finally { if (!ranAction) { breakBarrier(); } } } // 第4部分 // 只有部分线程到达屏障 for (;;) { try { //调用等待逻辑) if (!timed) { trip.await(); } else if (nanos > 0L) { nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { // 当线程中断时,调用breakbarier方法 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) { throw new BrokenBarrierException(); } // 假如不是现代,返回计数器的值 if (g != generation) { return index; } // 若等待超时,调用breakbarier方法 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); }}
CyclicBarrier#dowait
这种方法看起来很长,但逻辑并不复杂,如果分成三部分:
- 第一部分:CyclicBarrier和线程的状态验证;
- 第二部分:当计数器减1后值为0时,唤醒所有等待线程;
- 第三部分:当计数器减1后值不为0时,线程进入等待状态。
首先,让我们来看看Cyclicbarier和线程状态验证的第一部分。首先,判断Cyclicbarier是否被打破,然后判断当前线程是否中断。如果是,请调用它CyclicBarrier#breakBarrier
方法:
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll();}
CyclicBarrier#breakBarrier
方法很简单,只做了三件事:
- 标记Cyclicbarier被打破;
- 重置Cyclicbarier的计数器;
- 唤醒所有等待的线程。
也就是说,一旦线程标记为中断状态,Cyclicbarier的屏障就会被直接打破。
先跳过第二部分的唤醒逻辑,直接看第三部分线程进入等待状态的逻辑。根据timed
参数选择调用不同的Condition等待方法,然后处理异常处理和线程中断状态,也调用CyclicBarrier#breakBarrier
,标记Cyclicbarier是不可用的。线程进入等待状态的逻辑并不复杂,本质上是通过AQS的Condition来实现的。
最后,第二部分唤醒所有等待中线程的操作,并根据计数器是否为0来判断是否需要唤醒。如果需要唤醒,最后一个执行CyclicBarrier#await
的线程执行barrierCommand
(此时尚未执行任何线程唤醒操作),做屏障前的处理操作,然后调用CyclicBarrier#nextGeneration
方法:
private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation();}
CyclicBarrier#nextGeneration
方法也做了三件事:
- 唤醒Condition上等待的所有线程;
- 重置Cyclicbarier的计数器;
- 创建新的Generation对象。
这与进入“下一代”的名字非常一致。首先唤醒“上一代”等待中的所有线程,然后重置Cyclicbarier的计数器,最后更新Cyclicbarier的Generation对象,重置Cyclicbarier,使Cyclicbarier进入下一个时代。
在这里,不难发现,Cyclicbarier本身只做了维护和重置计数器的工作,依靠AQS家族的成员来保证互斥性和线程的等待和唤醒:
- Reentrantlock保证同时只能执行一个线程
CyclicBarrier#await
,也就是说,计数器同时只能维护一个线程; - Condition为CyclicBarrier提供了等待队列的条件,完成了线程的等待和唤醒。
最后,让我们来看看CyclicBarrier#reset
方法:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { // 主动打破Cyclicbarier breakBarrier(); // 让Cyclicbarier进入下一代 nextGeneration(); } finally { lock.unlock(); }}
CyclicBarrier#reset
方法都是老面孔,先是老面孔。CyclicBarrier#breakBarrier
打破上一代cyclicbarier,既然要重新开始,就不要“怀念”过去;最后调用CyclicBarrier#nextGeneration
开始新时代。需要注意的是,这里加锁的目的是确保执行CyclicBarrier#reset
没有任何线程正在执行CyclicBarrier#await
方法。
好了,我们一起分析了Cyclicbarrier的核心内容,剩下的方法很简单。相信大家都能通过名字了解它们的作用,猜到它们的实现。
Tips:CyclicBarrier#getNumberWaiting
为什么中加锁?
在最后一部分,让我们回答一开始的面试问题。CountDownlatch和Cyclicbarier有什么区别?
第一点:CyclicBarier可以重复使用,CountDownlatch不能重复使用。
无论是正常使用结束还是调用CyclicBarrier#reset
Cyclicbarier可以重置内部计数器
第二点:Cyclicbarrier只阻止调用CyclicBarrier#await
CountDownlatch可以阻塞任何一个或多个线程。
CountDownlatch将计数减1和阻塞分为CountDownLatch#countDown
和CountDownLatch#await
Cyclicbarier只有两种方法CyclicBarrier#await
两步操作完成。如果在同一行程中连续CountDownLatch#countDown
和CountDownLatch#await
则实现了与CyclicBarrier#await
功能相同的方法。
好了,今天就到这里。如果这篇文章对你有帮助,请表扬和支持。最后,欢迎大家关注金融钓鱼侠王有志分享硬核技术。下次见!