当前位置: 首页 > 图灵资讯 > 技术篇> AQS家族的“外门弟子”:CyclicBarrier

AQS家族的“外门弟子”:CyclicBarrier

来源:图灵教育
时间:2023-06-20 09:33:34

今天,让我们学习AQS家族的“外门弟子”:CyclicBarrier。

为何说CyclicBarrier是AQS家族的“外门弟子”?那是因为cyclicbarier本身和内部类Generation并没有继承AQS,而是在实现源代码时深深依赖AQS家族的成员Reentrantlock。就像修仙小说一样,大家庭会区分外门和内门,外门弟子通常会借助内门弟子的名声行事。Cyclicbarrier就是这样,所以被认为是AQS家族的“外门弟子”。在实际面试中,Cyclicbarier出现的次数较少,通常与Countdownlatch进行比较。

今天,我们将逐步拆解cyclicbarier,看看它和countdownlatch有什么区别。

什么是Cyclicbarier?

首先从Cyclicbarier的名字开始。Cyclic是一个形容词,翻译成“循环、循环”。barier是一个名词,翻译成“屏障、栅栏”,组合成“循环屏障”,那么如何理解“循环屏障”呢?让我们来看看Cyclicbarier的注释是如何解释的:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.Cyclicbarier是同步辅助工具,允许一组线程等待彼此到达共同的屏障点。

The barrier is called cyclic because it can be re-used after the waiting threads are released.屏障被称为循环屏障,因为它可以在等待线程释放后重复使用。

看起来和CountDownlatch有些相似,我们通过一张图片来展示Cyclicbarier是如何工作的:

20.AQS家族的“外门弟子”:CyclicBarrier_System

当部分线程到达屏障时,它们将在屏障处等待。只有当所有线程到达屏障时,它们才会继续执行。如果以CountDownlatch中的越野徒步旅行为例,拿走老板,等待球员之间的是Cyclicbarier。

另外,注释说Cyclicbarier是“re-used“,可以重复使用。回顾CountDownlatch的实现,没有做任何重置计数器的工作,即当CountDownlatch的计数减少到0后无法恢复,也就是说,CountDownlatch的功能是一次性的。

Tips:事实上,CountDownlatch可以实现类似Cyclicbarier的功能。

如何使用Cyclicbarier?

我们以没有老板参加的越野徒步为例。一些先到的球员必须等待后到的球员一起吃午饭。Cyclicbarrier实现的代码是这样的:

// Cyclicbariercyclicbaririer cyclicBarrier = new CyclicBarrier(10);for (int i = 0; i < 10; i++) {  int finalI = i;  new Thread(() -> {    try {      TimeUnit.SECONDS.sleep((finalI + 1));    } catch (InterruptedException e) {      throw new RuntimeException(e);    }    try {      System.out.println(“选手[””” + finalI + "]到达终点,等待其他选手!!!!");            // 在屏障点等待线程      cyclicBarrier.await();            System.out.println(“选手[””” + finalI + "开始吃午饭!!!");    } catch (InterruptedException | BrokenBarrierException e) {      throw new RuntimeException(e);    }  }).start();}

用法与Countdownlatch非常相似。Cyclicbarier需要多少线程才能达到屏障,区别在于Cyclicbarier在每个线程中都被调用CyclicBarrier#await,当我们使用CountDownlatch时,我们只在主线程中调用了一次CountDownLatch#await

CountDownlatch可以在线程中调用CountDownLatch#await答案是肯定的,这样使用的效果和Cyclicbarier一样:

CountDownLatch countDownLatch = new CountDownLatch(10);for (int i = 0; i < 10; i++) {  int finalI = i;  new Thread(() -> {    try {      TimeUnit.SECONDS.sleep((finalI + 1));    } catch (InterruptedException e) {      throw new RuntimeException(e);    }    System.out.println(“选手[””” + finalI + 到达终点!!!!");    countDownLatch.countDown();    try {      countDownLatch.await();      System.out.println(“选手[””” + finalI + "开始吃午饭!!!");    } catch (InterruptedException e) {      throw new RuntimeException(e);    }  }).start();}

通过以上例子,不难想到CyclicBarrier#await该方法同时具备CountDownLatch#countDown方法和CountDownLatch#await该方法的能力,即计数减1和暂停线程。

Cyclicbarier是如何实现的?

让我们对Cyclicbarierier有一个整体的了解:

20.AQS家族的“外门弟子”:CyclicBarrier_重置_02

Cyclicbarier的内部结构比CountDownlatch比较复杂,除了我们前面提到的借助AQS的“内门弟子”Reentrantlock类型。lock与Condition类型相比trip此外,Cyclicbarier还有两个“特别”的地方:

  • 内部类Generation,直译为“代”,它起什么作用?
  • Runnnable成员变量barrierCommand,它又做了什么?

其余部分,大部分都可以在CountDownlatch中找到相应的方法,或者我们可以通过名称轻松了解它们的功能。

Cyclicbarier的构造方法

Cyclicbarier提供了两种(实际上是一种)结构方法:

// privatete需要到达屏障的线程数 final int parties;// 所有线程到达后执行的动作privatete final Runnable barrierCommand;// private计数器 int count;public CyclicBarrier(int parties) {  this(parties, null);}public CyclicBarrier(int parties, Runnable barrierAction) {  if (parties <= 0) {    throw new IllegalArgumentException();  }  this.parties = parties;  this.count = parties;  this.barrierCommand = barrierAction;}

第二个结构函数接收了两个参数:

  • parties:需要多少线程才能到达屏障调用CyclicBarrier#await
  • barrierAction:所有线程到达屏障后执行的动作。

构造方法的代码一如既往的简单,只有一个地方容易产生疑惑,partiescount有什么区别?

首先,我们来看看成员变量的声明,parties使用了final,这表明它是一个不可变的对象,代表CyclicBarier需要几个线程才能到达屏障;count是计数器,初始值是parties,随着到达屏障的线程数量的增加count会逐渐减少到0。

内部类Generation,cyclicbarier
private static class Generation {  Generation() {}      boolean broken;}

Generation用于标记Cyclicbarier的现代,Doug Lea是这样解释它的作用的:

Each use of the barrier is represented as a generation instance. The generation changes whenever the barrier is tripped, or is reset.每次使用屏障(CyclicBarrier)都需要一个Generation的例子。Generation无论是通过屏障还是重置屏障都会发生变化。

在Generation中broken用来标记当前的Cyclicbarier是否被打破,默认为false,值为true时,说明Cyclicbarier已经被打破,此时Cyclicbarier不能正常使用,需要调用CyclicBarrier#reset方法重置CyclicBarier的状态。

CyclicBarrier#await方法

我们在前面猜测CyclicBarrier#await该方法实现了计数减1和线程等待功能。让我们通过源代码验证我们的想法:

public int await() throws InterruptedException, BrokenBarrierException {  try {    return dowait(false, 0L);  } catch (TimeoutException toe) {    throw new Error(toe);  }}public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {  return dowait(true, unit.toNanos(timeout));}

两种重载方法都指向CyclicBarrier#dowait方法:

private int dowait(boolean timed, long nanos)  throws InterruptedException, BrokenBarrierException, TimeoutException {  // 使用Reentrantlock  final ReentrantLock lock = this.lock;  lock.lock();    try {    // 第2部分    // 获得Cyclicbarier的现代,检查Cyclicbarier是否被打破    final Generation g = generation;    if (g.broken) {      throw new BrokenBarrierException();    }        // 当线程中断时,调用breakbarier方法    if (Thread.interrupted()) {      breakBarrier();      throw new InterruptedException();    }        // 第3部分    ///计数器减1    int index = --count;    // 当计数器为0时,意味着所有的线程都到了,此时要做的就是唤醒等待中的线程    if (index == 0) {      boolean ranAction = false;      try {        // 唤醒前执行操作        final Runnable command = barrierCommand;        if (command != null) {          command.run();        }        ranAction = true;        // Cyclicbarier进入下一代        nextGeneration();        return 0;      } finally {        if (!ranAction) {          breakBarrier();        }      }    }        // 第4部分    // 只有部分线程到达屏障    for (;;) {      try {        //调用等待逻辑)        if (!timed) {          trip.await();        } else if (nanos > 0L) {          nanos = trip.awaitNanos(nanos);        }      } catch (InterruptedException ie) {        // 当线程中断时,调用breakbarier方法        if (g == generation && ! g.broken) {          breakBarrier();          throw ie;        } else {          Thread.currentThread().interrupt();        }      }      if (g.broken) {        throw new BrokenBarrierException();      }      // 假如不是现代,返回计数器的值      if (g != generation) {        return index;      }      // 若等待超时,调用breakbarier方法      if (timed && nanos <= 0L) {        breakBarrier();        throw new TimeoutException();      }    }  } finally {    lock.unlock();  }}

CyclicBarrier#dowait这种方法看起来很长,但逻辑并不复杂,如果分成三部分:

  • 第一部分:CyclicBarrier和线程的状态验证;
  • 第二部分:当计数器减1后值为0时,唤醒所有等待线程;
  • 第三部分:当计数器减1后值不为0时,线程进入等待状态。

首先,让我们来看看Cyclicbarier和线程状态验证的第一部分。首先,判断Cyclicbarier是否被打破,然后判断当前线程是否中断。如果是,请调用它CyclicBarrier#breakBarrier方法:

private void breakBarrier() {  generation.broken = true;  count = parties;  trip.signalAll();}

CyclicBarrier#breakBarrier方法很简单,只做了三件事:

  • 标记Cyclicbarier被打破;
  • 重置Cyclicbarier的计数器;
  • 唤醒所有等待的线程。

也就是说,一旦线程标记为中断状态,Cyclicbarier的屏障就会被直接打破。

先跳过第二部分的唤醒逻辑,直接看第三部分线程进入等待状态的逻辑。根据timed参数选择调用不同的Condition等待方法,然后处理异常处理和线程中断状态,也调用CyclicBarrier#breakBarrier,标记Cyclicbarier是不可用的。线程进入等待状态的逻辑并不复杂,本质上是通过AQS的Condition来实现的。

最后,第二部分唤醒所有等待中线程的操作,并根据计数器是否为0来判断是否需要唤醒。如果需要唤醒,最后一个执行CyclicBarrier#await的线程执行barrierCommand(此时尚未执行任何线程唤醒操作),做屏障前的处理操作,然后调用CyclicBarrier#nextGeneration方法:

private void nextGeneration() {  trip.signalAll();  count = parties;  generation = new Generation();}

CyclicBarrier#nextGeneration方法也做了三件事:

  • 唤醒Condition上等待的所有线程;
  • 重置Cyclicbarier的计数器;
  • 创建新的Generation对象。

这与进入“下一代”的名字非常一致。首先唤醒“上一代”等待中的所有线程,然后重置Cyclicbarier的计数器,最后更新Cyclicbarier的Generation对象,重置Cyclicbarier,使Cyclicbarier进入下一个时代。

在这里,不难发现,Cyclicbarier本身只做了维护和重置计数器的工作,依靠AQS家族的成员来保证互斥性和线程的等待和唤醒:

  • Reentrantlock保证同时只能执行一个线程CyclicBarrier#await,也就是说,计数器同时只能维护一个线程;
  • Condition为CyclicBarrier提供了等待队列的条件,完成了线程的等待和唤醒。
CyclicBarrier#reset方法

最后,让我们来看看CyclicBarrier#reset方法:

public void reset() {  final ReentrantLock lock = this.lock;  lock.lock();  try {    // 主动打破Cyclicbarier    breakBarrier();    // 让Cyclicbarier进入下一代    nextGeneration();  } finally {    lock.unlock();  }}

CyclicBarrier#reset方法都是老面孔,先是老面孔。CyclicBarrier#breakBarrier打破上一代cyclicbarier,既然要重新开始,就不要“怀念”过去;最后调用CyclicBarrier#nextGeneration开始新时代。需要注意的是,这里加锁的目的是确保执行CyclicBarrier#reset没有任何线程正在执行CyclicBarrier#await方法。

好了,我们一起分析了Cyclicbarrier的核心内容,剩下的方法很简单。相信大家都能通过名字了解它们的作用,猜到它们的实现。

Tips:CyclicBarrier#getNumberWaiting为什么中加锁?

CountDownlatch和Cyclicbarier有什么区别?

在最后一部分,让我们回答一开始的面试问题。CountDownlatch和Cyclicbarier有什么区别?

第一点:CyclicBarier可以重复使用,CountDownlatch不能重复使用。

无论是正常使用结束还是调用CyclicBarrier#resetCyclicbarier可以重置内部计数器

第二点:Cyclicbarrier只阻止调用CyclicBarrier#awaitCountDownlatch可以阻塞任何一个或多个线程。

CountDownlatch将计数减1和阻塞分为CountDownLatch#countDownCountDownLatch#awaitCyclicbarier只有两种方法CyclicBarrier#await两步操作完成。如果在同一行程中连续CountDownLatch#countDownCountDownLatch#await则实现了与CyclicBarrier#await功能相同的方法。

结语

好了,今天就到这里。如果这篇文章对你有帮助,请表扬和支持。最后,欢迎大家关注金融钓鱼侠王有志分享硬核技术。下次见!