当前位置: 首页 > 图灵资讯 > 技术篇> CyclicBarrier 多线程都能“齐步走”的艺术【Java多线程必备】

CyclicBarrier 多线程都能“齐步走”的艺术【Java多线程必备】

来源:图灵教育
时间:2023-04-25 10:55:32

一、介绍

在某个时刻控制多个线程以达到共同的屏障点(Barrier Point),然后同时继续执行。当所有线程到达屏障点时,屏障将打开,所有线程可以继续执行,直到所有线程再次到达下一个屏障点。

二、特性

1. 可重用

当所有线程到达屏障点时,可以重置屏障,使所有线程再次从屏障点执行。

2. 可带参数

在所有线程到达屏障点后,可以指定Runnable对象,首先执行Runnable对象。

3. 线程同步

直到所有线程到达屏障点,才会继续执行。

4. 灵活性

屏障点的数量可以指定,即到达屏障点的线程数量,或者在构造函数中指定一个处理器来处理到达屏障点的所有线程。

5. 线程安全

Cyclicbarier是一个安全的线程,可以安全地用于多线程环境。

三、实现原则

实现原理是基于reentrantlock和condition。它使用计数器记录到达屏障点的线程数量。当线程调用await()时,它会被阻塞并添加到等待队列中,直到计数器的值达到指定数量。此时,所有被阻塞的线程将被唤醒并继续执行。

Cyclicbarier将在计数器值达到指定数量后重新初始化计数器,并调用指定的Runnable对象。同时,它还将使用bolean类型的变量来记录屏障是否打开。

四、应用场景

1. 数据流水线

每个线程处理数据后,需要等待其他线程完成,才能继续执行下一阶段。

2. 并行计算

等待所有线程完成任务,然后总结结果。

3. 多线程初始化

在开始执行之前,需要等待其他线程的初始化。

五、扩展内容

1. Cyclicbarierreset()方法可以重置屏障,使计数器恢复到初始值。如果在等待时调用reset()方法,所有正在等待的线程都会抛出brokenbarierexception异常。

2. Runnnable对象可以在Cyclicbarier的构造函数中指定。当计数器值达到设定值时,Runnable对象将自动执行。

3. Cyclicbarierawait()方法有多个重载版本,其中最常用的是无参数的await()方法,它会让当前线程等待到所有线程到达屏障点。还有一个带参数的await(long timeout, TimeUnit unit)该方法允许当前线程等待一定的时间。如果其他线程在指定时间内未到达屏障点,则当前线程将抛出Timeoutexception异常。

4. 在Java 在8中,Cyclicbarier还增加了两种方法:getnumberwaiting()和getparties(),可以分别获得当前等待的线程数量和参与屏障的线程总数。

5. Cyclicbarrier适用于需要相互等待的一组线程,然后共同执行下一个操作场景,如多线程计算矩阵、多线程下载文件合并等。

六、Cyclicbarier与CountDownlatch的区别

1. Cyclicbarier计数器可重置,可重复使用;而CountDownlatch计数器只能使用一次;

2. Cyclicbarier可以让一组线程在到达屏障点之前相互等待,然后一起执行下一个操作;CountDownlatch只能让等待线程在计数器为0时同时执行下一个操作;

3. 当计数器值达到设定值时,Cyclicbarier可以指定Runnable对象自动执行;CountDownlatch没有类似的功能;

如果需要在多个线程之间反复同步,Cyclicbarier是更好的选择;如果下一步只需要在所有线程准备好后一起执行,那么CountDownlatch是更合适的选择。

七、实际应用

1. 案例一

(1) 场景

等待三个线程到达屏障点后才能继续执行。

(2) 代码

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.CyclicBarrier;/** * Cyclicbariercase * 简单的CyclicBarier示例: 等待设置的线程到达屏障点后才能继续执行。 * * @author wxy * @since 2023-04-23 */public class Cyclicbariercase {    private static final Logger LOGGER = LoggerFactory.getLogger(Cyclicbariercase.class);    public static void main(String[] args) {        int count = 3;        CyclicBarrier barrier = new CyclicBarrier(count, () -> {            LOGGER.info(“完成所有任务”);        });        for (int index1 = 1; index1 <= count; index1++ {            new Thread(() -> {                try {                    for (int index2 = 1; index2 <= 2; index2++ {                        LOGGER.info(线程{}执行第{}次任务”, Thread.currentThread().getName(), index2);                    }                    LOGGER.info("线程" + Thread.currentThread().getName() + "等待开始";                    barrier.await();                    LOGGER.info("线程" + Thread.currentThread().getName() + "等待完";                } catch (Exception e) {                    e.printStackTrace();                }            }, "Thread" + index1).start();        }    }}

在这个例子中,Cyclicbarrier创建了一个屏障,屏障的计数器值为3,表示需要等待三个线程到达屏障点才能继续执行。每个线程执行两个任务后,将调用await()等待其他线程完成任务。当所有线程到达屏障点时,将执行指定的Runnable对象,并输出“所有任务完成”。输出结果如下:

 

CyclicBarrier 多线程都能“齐步走”的艺术【Java多线程必备】_CyclicBarrier

2. 案例二

(1) 场景

以下是使用Cyclicbarier实现多线程计算矩阵和示例代码

(2) 代码

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.CyclicBarrier;/** * MatrixSumCase * Cyclicbarier实现多线程计算矩阵和示例 * * @author wxy * @since 2023-04-23 */public class MatrixSumCase {    private static final Logger LOGGER = LoggerFactory.getLogger(MatrixSumCase.class);    /**     * 矩阵大小     */    private static final int N = 1000;    /**     * 线程数     */    private static final int THREADS = 10;    /**     * 矩阵     */    private static final int[][] MATRIX = new int[N][N];    /**     * 行和数组     */    private static final int[] ROW_SUMS = new int[N];    private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(THREADS, new Runnable() {        // 所有线程到达屏障点后执行的操作        public void run() {            int sum = 0;            for (int i = 0; i < N; i++) {                sum += ROW_SUMS[i];            }            LOGGER.info("Matrix sum is " + sum);        }    });    public static void main(String[] args) {        // 初始化矩阵        for (int i = 0; i < N; i++) {            for (int j = 0; j < N; j++) {                MATRIX[i][j] = i + j;            }        }        // 创建多个线程计算矩阵的行和        for (int index = 0; index < THREADS; index++) {            final int threadId = index;            new Thread(new Runnable() {                public void run() {                    int start = threadId * (N / THREADS);                    int end = (threadId == THREADS - 1) ? N : (threadId + 1) * (N / THREADS);                    for (int i = start; i < end; i++) {                        int rowSum = 0;                        for (int j = 0; j < N; j++) {                            rowSum += MATRIX[i][j];                        }                        ROW_SUMS[i] = rowSum;                    }                    try {                        // 等待其他线程到达屏障点                        CYCLIC_BARRIER.await();                    } catch (Exception ex) {                        ex.printStackTrace();                    }                }            }).start();        }    }}

在上述示例代码中,首先创建一个Nxn矩阵,并使用多个线程来计算矩阵的每行和,并将结果保存在一个长度为N的数组中。使用CyclicBarrier实现多个线程之间的同步,并在所有线程计算后执行后续的求和操作。输出结果如下:

 

CyclicBarrier 多线程都能“齐步走”的艺术【Java多线程必备】_多线程_02