AQS AQS 简介 AQS是AbstractQueuedSynchronizer的简称,即抽象队列同步器,从字面意思上理解:
抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现; 
队列:使用先进先出(FIFO)队列存储数据; 
同步:实现了同步的功能。 
 
AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的同步器,比如我们常用到的同步类 ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。
AQS 采用了模版方法实现,子类只需要根据需要去实现自己关心的protected 方法即可造出符合我们自己需求的同步器。
AQS 的数据结构 AQS内部使用了一个volatile的变量state来作为资源的标识。同时定义了几个获取和改版state的protected方法。
1 2 3 4 5 6 7 8 9 10 11 protected  final  int  getState ()      return  this .state; } protected  final  void  setState (int  newState)      this .state = newState; } protected  final  boolean  compareAndSetState (int  expect, int  update)      return  STATE.compareAndSet(this , expect, update); } 
这三种叫做均是原子操作,其中compareAndSetState的实现依赖于Unsafe的compareAndSwapInt()方法。
而AQS类本身实现的是一些排队和阻塞的机制,比如具体线程等待队列的维护(如获取资源失败入队/唤醒出队等)。它内部使用了一个先进先出(FIFO)的双端队列,并使用了两个指针head和tail用于标识队列的头部和尾部。但它并不是直接储存线程,而是储存拥有线程的Node节点。其数据结构如图:
资源共享模式 资源有两种共享模式,或者说两种同步方式:
独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如ReentrantLock。 
共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如Semaphore/CountDownLatch。 
 
一般情况下,子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如ReadWriteLock。
AQS中关于这两种资源共享模式的定义源码(均在内部类Node中)。我们来看看Node的结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 static  final  class  Node           static  final  Node SHARED = new  Node();          static  final  Node EXCLUSIVE = null ;           static  final  int  CANCELLED = 1 ;           static  final  int  SIGNAL = -1 ;          static  final  int  CONDITION = -2 ;          static  final  int  PROPAGATE = -3 ;          volatile  int  waitStatus;     volatile  Node prev;      volatile  Node next;      volatile  Thread thread;      Node nextWaiter;           final  boolean  isShared ()           return  nextWaiter == SHARED;     }     Node(Thread thread, Node mode) {              this .nextWaiter = mode;         this .thread = thread;     }      } private  Node addWaiter (Node mode)           Node node = new  Node(Thread.currentThread(), mode);      } 
通过Node我们可以实现两个队列,一是通过prev和next实现CLH队列(线程同步队列,双向队列),二是nextWaiter实现Condition条件上的等待线程队列(单向队列),这个Condition主要用在ReentrantLock类中。
AQS的主要方法源码解析 使用模版方法定好算法骨架 AQS的设计是基于模板方法模式的,AQS主要定义了下面几个模版方法来获取资源:
acquire :独占模式,不可中断 
acquireInterruptibly:独占模式,可中断(在线程中断时可能会抛出InterruptedException) 
acquireShared:共享模式,不可中断 
acquireSharedInterruptibly:共享模式,可中断 (在线程中断时可能会抛出InterruptedException) 
 
下面以 acquire 为例看下AQS的模版方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public  final  void  acquire (int  arg)      if  (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         selfInterrupt(); } final  boolean  acquireQueued (final  Node node, int  arg)      boolean  failed = true ;     try  {         boolean  interrupted = false ;         for  (;;) {             final  Node p = node.predecessor();             if  (p == head && tryAcquire(arg)) {                 setHead(node);                 p.next = null ;                  failed = false ;                 return  interrupted;             }             if  (shouldParkAfterFailedAcquire(p, node) &&                 parkAndCheckInterrupt())                 interrupted = true ;         }     } finally  {         if  (failed)             cancelAcquire(node);     } } protected  boolean  tryAcquire (int  arg)    throw  new  UnsupportedOperationException(); }    
如上面的代码所示,AQS 已经定义好对应的获取锁的代码模版(算法骨架),子类可以根据需要去实现对应的方法即可,比如 Semaphore 只需要实现 tryAcquire 即可。通过使用模版方法,让AQS实现基本功能的同时又保持了良好的扩展性。 
1 2 3 4     protected  boolean  tryAcquire (int  arg)         throw  new  UnsupportedOperationException();     } 
注意,这里不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可,比如 Semaphore 只需要实现 tryAcquire 方法而不用实现其余不需要用到的方法(似的用法在 AbstractList的addall实现。提供默认的方法method1…method4方法,每个方法直接抛出异常,使用模板方法的时候强制重写用到的method方法,用不到的method不用重写。)
AQS 预留给子类进行扩展的方法主要有下面几个:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
 
子类根据需要去实现即可。
以独占方式获取资源为例 AQS获取资源的流程: 
获取资源的入口是acquire(int arg)方法。arg是要获取的资源的个数,在独占模式下始终为1。我们先来看看这个方法的逻辑:
1 2 3 4 5 public  final  void  acquire (int  arg)      if  (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         selfInterrupt(); } 
首先调用tryAcquire(arg)尝试去获取资源。前面提到了这个方法是在子类具体实现的。
如果获取资源失败,就通过addWaiter(Node.EXCLUSIVE)方法把这个线程插入到等待队列中。其中传入的参数代表要插入的Node是独占式的。这个方法的具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private  Node addWaiter (Node mode)           Node node = new  Node(Thread.currentThread(), mode);          Node pred = tail;     if  (pred != null ) {         node.prev = pred;                  if  (compareAndSetTail(pred, node)) {             pred.next = node;             return  node;         }     }          enq(node);     return  node; } private  Node enq (final  Node node)      for  (;;) {         Node t = tail;         if  (t == null ) {              if  (compareAndSetHead(new  Node()))                 tail = head;         } else  {             node.prev = t;             if  (compareAndSetTail(t, node)) {                 t.next = node;                 return  t;             }         }     } } 
上面的两个函数比较好理解,就是在队列的尾部插入新的Node节点,但是需要注意的是由于AQS中会存在多个线程同时争夺资源的情况,因此肯定会出现多个线程同时插入节点的操作,在这里是通过CAS自旋的方式保证了操作的线程安全性。
现在回到最开始的aquire(int arg)方法。现在通过addWaiter方法,已经把一个Node放到等待队列尾部了。而处于等待队列的结点是从头结点一个一个去获取资源的。具体的实现我们来看看acquireQueued方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 final  boolean  acquireQueued (final  Node node, int  arg)           boolean  failed = true ;     try  {                  boolean  interrupted = false ;                  for  (;;) {                          final  Node p = node.predecessor();                          if  (p == head && tryAcquire(arg)) {                                  setHead(node);                 p.next = null ;                  failed = false ;                 return  interrupted;             }                          if  (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())                 interrupted = true ;         }     } finally  {         if  (failed)             cancelAcquire(node);     } } 
跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,shouldParkAfterFailedAcquire代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private  static  boolean  shouldParkAfterFailedAcquire (Node pred, Node node)           int  ws = pred.waitStatus;          if  (ws == Node.SIGNAL)         return  true ;           if  (ws > 0 ) {         do  {                          node.prev = pred = pred.prev;         } while  (pred.waitStatus > 0 );         pred.next = node;     } else  {                  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     }     return  false ; } 
shouldParkAfterFailedAcquire流程如下:
资源释放 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public  final  boolean  release (int  arg)      if  (tryRelease(arg)) {         Node h = head;         if  (h != null  && h.waitStatus != 0 )             unparkSuccessor(h);         return  true ;     }     return  false ; } private  void  unparkSuccessor (Node node)           int  ws = node.waitStatus;     if  (ws < 0 )         compareAndSetWaitStatus(node, ws, 0 );          Node s = node.next;               if  (s == null  || s.waitStatus > 0 ) {         s = null ;                  for  (Node t = tail; t != null  && t != node; t = t.prev)             if  (t.waitStatus <= 0 )                 s = t;     }          if  (s != null )         LockSupport.unpark(s.thread); } 
通信工具类 我们常用到的通信工具类主要都是依靠借助AQS实现的(如semaphore、CountDownLatch、CyclicBarrier),实现的方式就是前面所说的根据需要去实现AQS预留的一些拓展方法:
AQS 预留给子类进行扩展的方法主要有下面几个:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
 
JDK中提供了一些工具类以供开发者使用。这样的话我们在遇到一些常见的应用场景时就可以使用这些工具类,而不用自己再重复造轮子了。
Semaphore(信号量) 功能概述 Java提供了经典信号量Semaphore的实现,它通过控制一定数量的许可(permit)的方式,来达到限制通用资源访问的目的。Semaphore往往用于资源有限的场景中,用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。例如:控制并发的线程数。
可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。
使用场景 比如:控制并发的线程数
比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。
比如:停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。
原理 Semaphore是通过一个计数器(记录许可证的数量)来实现的:
常用方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 acquire()   acquire(int  permits)   acquireUninterruptibly()       tryAcquire() tryAcquire(long  timeout, TimeUnit unit) release() hasQueuedThreads() getQueueLength() drainPermits() availablePermits() 
案例 1、假如有一个落魄的停车场只能容纳3辆车。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public  class  SemaphoreDemo      static  class  MyThread  implements  Runnable           private  int  value;         private  Semaphore semaphore;         public  MyThread (int  value, Semaphore semaphore)               this .value = value;             this .semaphore = semaphore;         }         @Override          public  void  run ()               try  {                 semaphore.acquire();                  System.out.println(String.format("车辆 粤A***%d 驶入, 还剩%d个车位,还有%d个车辆在等待" ,                         value, semaphore.availablePermits(), semaphore.getQueueLength()));                                  Random random = new  Random();                 Thread.sleep(random.nextInt(1000 ));                 System.out.println(String.format("车辆 粤A%d 已经驶出" , value));             } catch  (InterruptedException e) {                 e.printStackTrace();             } finally {                 semaphore.release();              }         }     }     public  static  void  main (String[] args)           Semaphore semaphore = new  Semaphore(3 );         for  (int  i = 0 ; i < 10 ; i++) {             new  Thread(new  MyThread(i, semaphore)).start();         }     } } 
运行结果:
结果分析:最开始是A3, A 4 这辆车获得了资源车位,而其它车辆进入了等待队列。然后当A 
总结:Semaphore往往用于资源有限的场景中,去限制线程的数量。最主要的方法是acquire方法和release方法。acquire()方法会申请一个permit,而release方法会释放一个permit。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。
源码解析 构造函数
1 2 3 4 5 6 7 8 9 public  Semaphore (int  permits)      sync = new  NonfairSync(permits); } public  Semaphore (int  permits, boolean  fair)      sync = fair ? new  FairSync(permits) : new  NonfairSync(permits); } 
Semaphore有两个构造函数:
参数permits表示许可数,它最后传递给了AQS的state值。线程在运行时首先获取许可,如果成功,许可数就减1,线程运行,当线程运行结束就释放许可,许可数就加1。如果许可数为0,则获取失败,线程位于AQS的等待队列中,它会被其它释放许可的线程唤醒。
fair,用于指定它的公平性。一般常用非公平的信号量,非公平信号量是指在获取许可时先尝试获取许可,而不必关心是否已有需要获取许可的线程位于等待队列中,如果获取失败,才会入列。而公平的信号量在获取许可时首先要查看等待队列中是否已有线程,如果有则入列排队。
 
acquire源代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public  void  acquire ()  throws  InterruptedException     sync.acquireSharedInterruptibly(1 ); } public  final  void  acquireSharedInterruptibly (int  arg)         throws  InterruptedException  {    if  (Thread.interrupted())         throw  new  InterruptedException();     if  (tryAcquireShared(arg) < 0 )         doAcquireSharedInterruptibly(arg); } final  int  nonfairTryAcquireShared (int  acquires)      for  (;;) {         int  available = getState();         int  remaining = available - acquires;         if  (remaining < 0  ||             compareAndSetState(available, remaining))             return  remaining;     } }     static  final  class  FairSync  extends  Sync           private  static  final  long  serialVersionUID = 2014338818796000944L ;         FairSync(int  permits) {             super (permits);         }         protected  int  tryAcquireShared (int  acquires)               for  (;;) {                 if  (hasQueuedPredecessors())                     return  -1 ;                 int  available = getState();                 int  remaining = available - acquires;                 if  (remaining < 0  ||                     compareAndSetState(available, remaining))                     return  remaining;             }         }     } 
可以看出,如果remaining <0 即获取许可后,许可数小于0,则获取失败,在doAcquireSharedInterruptibly方法中线程会将自身阻塞,然后入列。
release源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public  void  release ()      sync.releaseShared(1 ); } public  final  boolean  releaseShared (int  arg)      if  (tryReleaseShared(arg)) {         doReleaseShared();         return  true ;     }     return  false ; } protected  final  boolean  tryReleaseShared (int  releases)      for  (;;) {         int  current = getState();         int  next = current + releases;         if  (next < current)             throw  new  Error("Maximum permit count exceeded" );         if  (compareAndSetState(current, next))             return  true ;     } } 
上述代码看出释放许可就是将AQS中state的值加1。然后通过doReleaseShared唤醒等待队列的第一个节点。可以看出Semaphore使用的是AQS的共享模式,等待队列中的第一个节点,如果第一个节点成功获取许可,又会唤醒下一个节点,以此类推。
CountDownLatch(倒计数门闸锁) 功能概述 CountDownLatch是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行。
CountDownLatch这个类名字的意义。CountDown代表计数递减,Latch是“门闩”的意思。也有人把它称为“屏障”。而CountDownLatch这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。
使用场景 主任务开始之前的一些前期准备工作:
倒数计时器:
原理 CountDownLatch是通过一个计数器来实现的,计数器的初始值为需要等待线程的数量。
主线程调用CountDownLatch的await()方法会阻塞当前线程(即:主线程在闭锁上等待),直到计数器的值为0 
当一个工作线程完成了自己的任务后,调用CountDownLatch的countDown()方法,计数器的值就会减1。 
当计数器值为0时,说明所有的工作线程都执行完了,此时,在闭锁上等待的主线程就可以恢复执行任务。 
 
案例  使用 CountDownLatch实现在游戏开始前的前置任务,任务完成后游戏自动开始,比如“加载地图数据”,“加载人物模型”,“加载背景音乐”等等。只有当所有的东西都加载完成后,游戏才开始
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public  class  CountDownLatchDemo           static  class  PreTaskThread  implements  Runnable           private  String task;         private  CountDownLatch countDownLatch;         public  PreTaskThread (String task, CountDownLatch countDownLatch)               this .task = task;             this .countDownLatch = countDownLatch;         }         @Override          public  void  run ()               try  {                 Random random = new  Random();                 Thread.sleep(random.nextInt(1000 ));                 System.out.println(task + " - 任务完成" );                 countDownLatch.countDown();             } catch  (InterruptedException e) {                 e.printStackTrace();             }         }     }     public  static  void  main (String[] args)                    CountDownLatch countDownLatch = new  CountDownLatch(3 );                  new  Thread(() -> {             try  {                 System.out.println("等待数据加载..." );                 System.out.println(String.format("还有%d个前置任务" , countDownLatch.getCount()));                 countDownLatch.await();                 System.out.println("数据加载完成,正式开始游戏!" );             } catch  (InterruptedException e) {                 e.printStackTrace();             }         }).start();                  new  Thread(new  PreTaskThread("加载地图数据" , countDownLatch)).start();         new  Thread(new  PreTaskThread("加载人物模型" , countDownLatch)).start();         new  Thread(new  PreTaskThread("加载背景音乐" , countDownLatch)).start();     } } 
运行结果:
总结:CountDownLatch这个类的作用是,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。比如某个线程A等待若干个其他线程执行完任务之后,它才执行。
源码分析 常用方法:
1 2 3 4 5 6 7 public  CountDownLatch (int  count) public  void  await ()  public  boolean  await (long  timeout, TimeUnit unit)  public  void  countDown ()  public  long  getCount ()  
await方法的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  void  await ()  throws  InterruptedException     sync.acquireSharedInterruptibly(1 ); }   public  final  void  acquireSharedInterruptibly (int  arg)         throws  InterruptedException  {    if  (Thread.interrupted())         throw  new  InterruptedException();     if  (tryAcquireShared(arg) < 0 )         doAcquireSharedInterruptibly(arg);  }   protected  int  tryAcquireShared (int  acquires)      return  (getState() == 0 ) ? 1  : -1 ; } 
countDown方法的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  void  countDown ()      sync.releaseShared(1 ); }   public  final  boolean  releaseShared (int  arg)      if  (tryReleaseShared(arg)) {         doReleaseShared();          return  true ;     }     return  false ; }   protected  boolean  tryReleaseShared (int  releases)           for  (;;) {         int  c = getState();         if  (c == 0 )             return  false ;         int  nextc = c-1 ;         if  (compareAndSetState(c, nextc))             return  nextc == 0 ;     } } 
CyclicBarrier (可重用的倒计数栅栏) 功能概述 CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
与CountDownLatch区别是:
使用场景 CyclicBarrier:一组线程互相等待,当它们都达到各自await()指定的barrier时,它们再同时继续执行各自下面的代码。
经典使用场景是公交发车,比如每辆公交车只要上满 4 个人就发车,后面来的人都会排队依次遵循相应的标准。
原理 CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,在CyclicBarrier的内部定义了一个Lock对象(使用ReentrantLock来实现的),每当一个线程调用CyclicBarrier的await方法时,将剩余拦截的线程数减1,然后判断剩余拦截数是否为0,如果不是,进入Lock对象的条件队列等待。如果是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程会依次的获取锁、释放锁,接着先从await方法返回,再从CyclicBarrier的await方法中返回。
案例 案例一:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public  class  CyclicBarrierDemo2      static  class  TaskThread  extends  Thread           CyclicBarrier barrier;         public  TaskThread (CyclicBarrier barrier)               this .barrier = barrier;         }         @Override          public  void  run ()               try  {                 Thread.sleep(1000 );                 System.out.println("选手" +getName() + " 到达栅栏 A 就位" );                 barrier.await();                 System.out.println("选手" +getName() + " 走出栅栏 A" );                 Thread.sleep(2000 );                 System.out.println("选手" +getName() + " 到达栅栏 B" );                 barrier.await();                 System.out.println("选手" +getName() + " 走出栅栏 B" );             } catch  (Exception e) {                 e.printStackTrace();             }         }     }     public  static  void  main (String[] args)           int  threadNum = 3 ;         CyclicBarrier barrier = new  CyclicBarrier(threadNum, new  Runnable() {             @Override              public  void  run ()                   System.out.println(Thread.currentThread().getName() + " 人员就位到齐设备自动打开栅栏" );             }         });         for (int  i = 0 ; i < threadNum; i++) {             new  TaskThread(barrier).start();         }     } } 
运行结果:
结果分析:
案例二:如果玩一个游戏有多个“关卡”,那使用CountDownLatch显然不太合适,那需要为每个关卡都创建一个实例。那我们可以使用CyclicBarrier来实现每个关卡的数据加载等待功能。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public  class  CyclicBarrierDemo      static  class  PreTaskThread  implements  Runnable           private  String task;         private  CyclicBarrier cyclicBarrier;         public  PreTaskThread (String task, CyclicBarrier cyclicBarrier)               this .task = task;             this .cyclicBarrier = cyclicBarrier;         }         @Override          public  void  run ()                            for  (int  i = 1 ; i < 4 ; i++) {                 try  {                     Random random = new  Random();                     Thread.sleep(random.nextInt(1000 ));                     System.out.println(String.format("关卡%d的任务%s完成" , i, task));                     cyclicBarrier.await();                 } catch  (InterruptedException | BrokenBarrierException e) {                     e.printStackTrace();                 }                 cyclicBarrier.reset();              }         }     }     public  static  void  main (String[] args)           CyclicBarrier cyclicBarrier = new  CyclicBarrier(3 , () -> {             System.out.println("本关卡所有前置任务完成,开始游戏..." );         });         new  Thread(new  PreTaskThread("加载地图数据" , cyclicBarrier)).start();         new  Thread(new  PreTaskThread("加载人物模型" , cyclicBarrier)).start();         new  Thread(new  PreTaskThread("加载背景音乐" , cyclicBarrier)).start();     } } 
运行结果:
前面提到了CountDownLatch一旦计数值count被降为0后,就不能再重新设置了,它只能起一次“屏障”的作用。而CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()方法重置屏障。
源码分析 构造函数:
1 2 3 4 5 6 public  CyclicBarrier (int  parties)      this (parties, null ); } public  int  getParties ()      return  parties; } 
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction这个Runnable对象,方便处理更复杂的业务场景。
await源码:
1 2 3 4 5 6 7 public  int  await ()  throws  InterruptedException, BrokenBarrierException     try  {         return  dowait(false , 0L );     } catch  (TimeoutException toe) {         throw  new  Error(toe);      } } 
dowait源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 private  int  dowait (boolean  timed, long  nanos)     throws  InterruptedException, BrokenBarrierException,            TimeoutException  {    final  ReentrantLock lock = this .lock;     lock.lock();     try  {         final  Generation g = generation;           if  (g.broken)             throw  new  BrokenBarrierException();           if  (Thread.interrupted()) {             breakBarrier();             throw  new  InterruptedException();         }           int  index = --count;         if  (index == 0 ) {               boolean  ranAction = false ;             try  {                 final  Runnable command = barrierCommand;                 if  (command != null )                     command.run();                 ranAction = true ;                 nextGeneration();                 return  0 ;             } finally  {                 if  (!ranAction)                     breakBarrier();             }         }                    for  (;;) {             try  {                 if  (!timed)                     trip.await();                 else  if  (nanos > 0L )                     nanos = trip.awaitNanos(nanos);             } catch  (InterruptedException ie) {                 if  (g == generation && ! g.broken) {                     breakBarrier();                     throw  ie;                 } else  {                                                                                    Thread.currentThread().interrupt();                 }             }               if  (g.broken)                 throw  new  BrokenBarrierException();               if  (g != generation)                 return  index;               if  (timed && nanos <= 0L ) {                 breakBarrier();                 throw  new  TimeoutException();             }         }     } finally  {         lock.unlock();     } } 
当最后一个线程到达屏障点,也就是执行dowait方法时,会在return 0 返回之前调用finally块中的breakBarrier方法。
breakBarrier源代码:
1 2 3 4 5 private  void  breakBarrier ()      generation.broken = true ;     count = parties;     trip.signalAll(); } 
CycliBarrier对象可以重复使用,重用之前应当调用CyclicBarrier对象的reset方法。
1 2 3 4 5 6 7 8 9 10 public  void  reset ()      final  ReentrantLock lock = this .lock;     lock.lock();     try  {         breakBarrier();            nextGeneration();      } finally  {         lock.unlock();     } } 
总结对比Semaphore、CountDownLatch、CyclicBarrier 
Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。
CountDownLatch和CyclicBarrier都能够实现线程之间的等待。CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用。
CyclicBarrier和CoutDownLatch的底层实现也存在一点区别,CountDownLatch底层是直接通过组合一个继承了AQS的同步组件来实现的,而CyclicBarrier并没有直接借助AQS的同步组件,而是通过组合ReentrantLock这把锁来实现的(ReentrantLock的底层实现依然使用的AQS来实现的,归根结底,CyclicBarrier的底层实现也是AQS)。
 
参考资料 Semaphore 使用及原理 Java并发包中Semaphore的工作原理、源码分析及使用示例 CAS与原子操作 通信工具类 从ReentrantLock的实现看AQS的原理及应用