一、介绍
在某个时刻控制多个线程以达到共同的屏障点(Barrier Point),然后同时继续执行。当所有线程到达屏障点时,屏障将打开,所有线程可以继续执行,直到所有线程再次到达下一个屏障点。
二、特性1. 可重用
当所有线程到达屏障点时,可以重置屏障,使所有线程再次从屏障点执行。
2. 可带参数
在所有线程到达屏障点后,可以指定Runnable对象,首先执行Runnable对象。
3. 线程同步
直到所有线程到达屏障点,才会继续执行。
4. 灵活性
屏障点的数量可以指定,即到达屏障点的线程数量,或者在构造函数中指定一个处理器来处理到达屏障点的所有线程。
5. 线程安全
Cyclicbarier是一个安全的线程,可以安全地用于多线程环境。
三、实现原则实现原理是基于reentrantlock和condition。它使用计数器记录到达屏障点的线程数量。当线程调用await()时,它会被阻塞并添加到等待队列中,直到计数器的值达到指定数量。此时,所有被阻塞的线程将被唤醒并继续执行。
Cyclicbarier将在计数器值达到指定数量后重新初始化计数器,并调用指定的Runnable对象。同时,它还将使用bolean类型的变量来记录屏障是否打开。
四、应用场景1. 数据流水线
每个线程处理数据后,需要等待其他线程完成,才能继续执行下一阶段。
2. 并行计算
等待所有线程完成任务,然后总结结果。
3. 多线程初始化
在开始执行之前,需要等待其他线程的初始化。
五、扩展内容1. Cyclicbarierreset()方法可以重置屏障,使计数器恢复到初始值。如果在等待时调用reset()方法,所有正在等待的线程都会抛出brokenbarierexception异常。
2. Runnnable对象可以在Cyclicbarier的构造函数中指定。当计数器值达到设定值时,Runnable对象将自动执行。
3. Cyclicbarierawait()方法有多个重载版本,其中最常用的是无参数的await()方法,它会让当前线程等待到所有线程到达屏障点。还有一个带参数的await(long timeout, TimeUnit unit)该方法允许当前线程等待一定的时间。如果其他线程在指定时间内未到达屏障点,则当前线程将抛出Timeoutexception异常。
4. 在Java 在8中,Cyclicbarier还增加了两种方法:getnumberwaiting()和getparties(),可以分别获得当前等待的线程数量和参与屏障的线程总数。
5. Cyclicbarrier适用于需要相互等待的一组线程,然后共同执行下一个操作场景,如多线程计算矩阵、多线程下载文件合并等。
六、Cyclicbarier与CountDownlatch的区别1. Cyclicbarier计数器可重置,可重复使用;而CountDownlatch计数器只能使用一次;
2. Cyclicbarier可以让一组线程在到达屏障点之前相互等待,然后一起执行下一个操作;CountDownlatch只能让等待线程在计数器为0时同时执行下一个操作;
3. 当计数器值达到设定值时,Cyclicbarier可以指定Runnable对象自动执行;CountDownlatch没有类似的功能;
七、实际应用如果需要在多个线程之间反复同步,Cyclicbarier是更好的选择;如果下一步只需要在所有线程准备好后一起执行,那么CountDownlatch是更合适的选择。
1. 案例一
(1) 场景
等待三个线程到达屏障点后才能继续执行。
(2) 代码
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.CyclicBarrier;/** * Cyclicbariercase * 简单的CyclicBarier示例: 等待设置的线程到达屏障点后才能继续执行。 * * @author wxy * @since 2023-04-23 */public class Cyclicbariercase { private static final Logger LOGGER = LoggerFactory.getLogger(Cyclicbariercase.class); public static void main(String[] args) { int count = 3; CyclicBarrier barrier = new CyclicBarrier(count, () -> { LOGGER.info(“完成所有任务”); }); for (int index1 = 1; index1 <= count; index1++ { new Thread(() -> { try { for (int index2 = 1; index2 <= 2; index2++ { LOGGER.info(线程{}执行第{}次任务”, Thread.currentThread().getName(), index2); } LOGGER.info("线程" + Thread.currentThread().getName() + "等待开始"; barrier.await(); LOGGER.info("线程" + Thread.currentThread().getName() + "等待完"; } catch (Exception e) { e.printStackTrace(); } }, "Thread" + index1).start(); } }}
在这个例子中,Cyclicbarrier创建了一个屏障,屏障的计数器值为3,表示需要等待三个线程到达屏障点才能继续执行。每个线程执行两个任务后,将调用await()等待其他线程完成任务。当所有线程到达屏障点时,将执行指定的Runnable对象,并输出“所有任务完成”。输出结果如下:
2. 案例二
(1) 场景
以下是使用Cyclicbarier实现多线程计算矩阵和示例代码
(2) 代码
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.CyclicBarrier;/** * MatrixSumCase * Cyclicbarier实现多线程计算矩阵和示例 * * @author wxy * @since 2023-04-23 */public class MatrixSumCase { private static final Logger LOGGER = LoggerFactory.getLogger(MatrixSumCase.class); /** * 矩阵大小 */ private static final int N = 1000; /** * 线程数 */ private static final int THREADS = 10; /** * 矩阵 */ private static final int[][] MATRIX = new int[N][N]; /** * 行和数组 */ private static final int[] ROW_SUMS = new int[N]; private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(THREADS, new Runnable() { // 所有线程到达屏障点后执行的操作 public void run() { int sum = 0; for (int i = 0; i < N; i++) { sum += ROW_SUMS[i]; } LOGGER.info("Matrix sum is " + sum); } }); public static void main(String[] args) { // 初始化矩阵 for (int i = 0; i < N; i++) { for (int j = 0; j < N; j++) { MATRIX[i][j] = i + j; } } // 创建多个线程计算矩阵的行和 for (int index = 0; index < THREADS; index++) { final int threadId = index; new Thread(new Runnable() { public void run() { int start = threadId * (N / THREADS); int end = (threadId == THREADS - 1) ? N : (threadId + 1) * (N / THREADS); for (int i = start; i < end; i++) { int rowSum = 0; for (int j = 0; j < N; j++) { rowSum += MATRIX[i][j]; } ROW_SUMS[i] = rowSum; } try { // 等待其他线程到达屏障点 CYCLIC_BARRIER.await(); } catch (Exception ex) { ex.printStackTrace(); } } }).start(); } }}
在上述示例代码中,首先创建一个Nxn矩阵,并使用多个线程来计算矩阵的每行和,并将结果保存在一个长度为N的数组中。使用CyclicBarrier实现多个线程之间的同步,并在所有线程计算后执行后续的求和操作。输出结果如下: