前言

本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见100个问题搞定Java并发

正文

CyclicBarrier 是另外一种多线程并发控制工具。

和 CountDownLatch 非常类似,它也可以实现线程间的计数等待,但它的功能比 CountDownLatch 更加复杂且强大。

CyclicBarrier 可以理解为循环栅栏

栅栏就是一种障碍物,比如,通常在私人宅邸的周围就可以围上一圈栅栏,阻止闲杂人等入内。

这里当然就是用来阻止线程继续执行,要求线程在栅栏外等待。

前面 Cyclic 意为循环,也就是说这个计数器可以反复使用。

比如,我们将计数器设置为 10 ,那么湊齐第一批 10 个线程后,计数器就会归零,接着凑齐下一批 10 个线程,这就是循环栅栏内在的含义。

CyclicBarrier 的使用场景也很丰富。

比如,司令下达命令,要求 10 个士兵一起去完成项任务。

这时就会要求 10 个士兵先集合报到,接着,一起雄赳赳,气昂昂地去执行任务当 10 个士兵把自己手上的任务都执行完了,那么司令才能对外宣布,任务完成

CyclicBarrier 比 CountDownLatch 略微强大一些,它可以接收一个参数作为 barrierAction。

所谓 barrierAction 就是当计数器一次计数完成后,系统会执行的动作。

如下构造函数,其中, parties 表示计数总数,也就是参与的线程总数。

public CyclicBarrier(int parties, Runnable barrierAction) 

源码(JDK8)

/**
 * 允许一组线程全部等待彼此达到共同屏障点的同步辅助。 
 * 
 * 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 
 * 
 * 屏障是循环的 ,因为它可以在等待的线程被释放之后重新使用。
 *
 * CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 
 * 
 * 在任何一方继续进行之前,此屏障操作对更新共享状态很有用。
 * 
 * 如果屏障操作不依赖于执行方暂停的各方,那么该方可以在释放任何线程时执行该操作。 
 * 
 * 为了方便这一点,每次调用await()返回该线程在屏障上的到达索引。 
 * 
 * 然后,您可以选择哪个线程应该执行屏障操作,例如:
 * 
 * if (barrier.await() == 0) { // log the completion of this iteration }
 * 
 * CyclicBarrier对失败的同步尝试使用all-or-none断裂模型:
 * 
 * 如果线程由于中断,故障或超时而过早离开障碍点,那么在该障碍点等待的所有其他线程也将通过BrokenBarrierException (或InterruptedException)异常离开如果他们也在同一时间被打断的话。
 * 
 * 内存一致性效果:一个线程调用 await() happen-before 屏障操作,该屏障操作也 happen-before 另一个线程调用 await() 并且成功返回。
 */
public class CyclicBarrier

官方示例

package com.shockang.study.java.concurrent.aqs;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Solver {
    final int N;
    final float[][] data;
    final CyclicBarrier barrier;

    class Worker implements Runnable {
        int myRow;

        Worker(int row) {
            myRow = row;
        }

        public void run() {
            while (!done()) {
                processRow(myRow);

                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }
    }


    public Solver(float[][] matrix) throws InterruptedException {
        data = matrix;
        N = matrix.length;
        Runnable barrierAction =
                new Runnable() {
                    public void run() {
                        mergeRows();
                    }
                };
        barrier = new CyclicBarrier(N, barrierAction);

        List<Thread> threads = new ArrayList<Thread>(N);
        for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
        }

        // wait until done
        for (Thread thread : threads)
            thread.join();
    }

    private boolean done() {
        return true;
    }

    private void processRow(int myRow) {
    }

    private void mergeRows() {
    }
}

实践

下面的示例使用 CyclicBarrier 演示了上面提到的司令命令士兵完成任务的场景。


package com.shockang.study.java.concurrent.aqs;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static class Soldier implements Runnable {
        private String soldier;
        private final CyclicBarrier cyclic;

        Soldier(CyclicBarrier cyclic, String soldierName) {
            this.cyclic = cyclic;
            this.soldier = soldierName;
        }

        public void run() {
            try {
                //等待所有士兵到齐
                cyclic.await();
                doWork();
                //等待所有士兵完成工作
                cyclic.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt() % 10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + ":任务完成");
        }
    }

    public static class BarrierRun implements Runnable {
        boolean flag;
        int N;

        public BarrierRun(boolean flag, int N) {
            this.flag = flag;
            this.N = N;
        }

        public void run() {
            if (flag) {
                System.out.println("司令:[士兵" + N + "个,任务完成!]");
            } else {
                System.out.println("司令:[士兵" + N + "个,集合完毕!]");
                flag = true;
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        final int N = 10;
        Thread[] allSoldier = new Thread[N];
        boolean flag = false;
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
        //设置屏障点,主要是为了执行这个方法
        System.out.println("集合队伍!");
        for (int i = 0; i < N; ++i) {
            System.out.println("士兵 " + i + " 报道!");
            allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
            allSoldier[i].start();
        }
    }
}

控制台输出

集合队伍!
士兵 0 报道!
士兵 1 报道!
士兵 2 报道!
士兵 3 报道!
士兵 4 报道!
士兵 5 报道!
士兵 6 报道!
士兵 7 报道!
士兵 8 报道!
士兵 9 报道!
司令:[士兵10个,集合完毕!]
士兵 0:任务完成
士兵 3:任务完成
士兵 6:任务完成
士兵 4:任务完成
士兵 9:任务完成
士兵 8:任务完成
士兵 2:任务完成
士兵 5:任务完成
士兵 7:任务完成
士兵 1:任务完成
司令:[士兵10个,任务完成!]

说明

上述代码第 65 行创建了 CyclicBarrier 实例,并将计数器设置为 10 ,要求在计数器达到指标时,执行第 51 行的 run() 方法。

每一个士兵线程都会执行第 18 行定义的 run() 方法。

在第 24 行,每一个士兵线程都会等待,直到所有的士兵都集合完毕。

集合完毕意味着 CyclicBarrier 的一次计数完成,当再一次调用 CyclicBarrier.await() 方法时,会进行下一次计数。

第 22 行模拟了士兵的任务。

当一个士兵任务执行完,他就会要求 CyclicBarrier 开始下次计数,这次计数主要目的是监控是否所有的士兵都己经完成了任务。

一旦任务全部完成,第 42 行定义的 BarrierRun 就会被调用,打印相关信息。

上一篇 下一篇