0%

深入浅出Java多线程-AQS及通信工具类

AQS

AQS 简介

AQS是AbstractQueuedSynchronizer的简称,即抽象队列同步器,从字面意思上理解:

  • 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
  • 队列:使用先进先出(FIFO)队列存储数据;
  • 同步:实现了同步的功能。

AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的同步器,比如我们常用到的同步类 ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。

e8860a5e5c273c648ff38504afa98c63.png

AQS 采用了模版方法实现,子类只需要根据需要去实现自己关心的protected 方法即可造出符合我们自己需求的同步器。

AQS 的数据结构

AQS内部使用了一个volatile的变量state来作为资源的标识。同时定义了几个获取和改版state的protected方法。

1
2
3
4
5
6
7
8
9
10
11
protected final int getState() {
return this.state;
}

protected final void setState(int newState) {
this.state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}

这三种叫做均是原子操作,其中compareAndSetState的实现依赖于Unsafe的compareAndSwapInt()方法。

而AQS类本身实现的是一些排队和阻塞的机制,比如具体线程等待队列的维护(如获取资源失败入队/唤醒出队等)。它内部使用了一个先进先出(FIFO)的双端队列,并使用了两个指针head和tail用于标识队列的头部和尾部。但它并不是直接储存线程,而是储存拥有线程的Node节点。其数据结构如图:

e3144b5ad4ae0bf1b473687efa2d487e.png

资源共享模式

资源有两种共享模式,或者说两种同步方式:

  • 独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如ReentrantLock。
  • 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如Semaphore/CountDownLatch。

一般情况下,子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如ReadWriteLock。

AQS中关于这两种资源共享模式的定义源码(均在内部类Node中)。我们来看看Node的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static final class Node {
// 标记一个结点(对应的线程)在共享模式下等待
static final Node SHARED = new Node();
// 标记一个结点(对应的线程)在独占模式下等待
static final Node EXCLUSIVE = null;

// waitStatus的值,表示该结点(对应的线程)已被取消
static final int CANCELLED = 1;
// waitStatus的值,表示后继结点(对应的线程)需要被唤醒
static final int SIGNAL = -1;
// waitStatus的值,表示该结点(对应的线程)在等待某一条件
static final int CONDITION = -2;
/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
static final int PROPAGATE = -3;

// 等待状态,取值范围,-3,-2,-1,0,1
volatile int waitStatus;
volatile Node prev; // 前驱结点
volatile Node next; // 后继结点
volatile Thread thread; // 结点对应的线程
Node nextWaiter; // 等待队列里下一个等待条件的结点


// 判断共享模式的方法
final boolean isShared() {
return nextWaiter == SHARED;
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

// 其它方法忽略,可以参考具体的源码
}

// AQS里面的addWaiter私有方法
private Node addWaiter(Node mode) {
// 使用了Node的这个构造函数
Node node = new Node(Thread.currentThread(), mode);
// 其它代码省略
}

通过Node我们可以实现两个队列,一是通过prev和next实现CLH队列(线程同步队列,双向队列),二是nextWaiter实现Condition条件上的等待线程队列(单向队列),这个Condition主要用在ReentrantLock类中。

AQS的主要方法源码解析

使用模版方法定好算法骨架

AQS的设计是基于模板方法模式的,AQS主要定义了下面几个模版方法来获取资源:

  • acquire :独占模式,不可中断
  • acquireInterruptibly:独占模式,可中断(在线程中断时可能会抛出InterruptedException)
  • acquireShared:共享模式,不可中断
  • acquireSharedInterruptibly:共享模式,可中断 (在线程中断时可能会抛出InterruptedException)

下面以 acquire 为例看下AQS的模版方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//获取独占资源
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

//自旋不间断的从等待队列用头节点尝试获取资源(独占方式)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

//留给子类去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

如上面的代码所示,AQS 已经定义好对应的获取锁的代码模版(算法骨架),子类可以根据需要去实现对应的方法即可,比如 Semaphore 只需要实现 tryAcquire 即可。通过使用模版方法,让AQS实现基本功能的同时又保持了良好的扩展性。

1
2
3
4
//留给子类去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

注意,这里不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可,比如 Semaphore 只需要实现 tryAcquire 方法而不用实现其余不需要用到的方法(似的用法在 AbstractList的addall实现。提供默认的方法method1…method4方法,每个方法直接抛出异常,使用模板方法的时候强制重写用到的method方法,用不到的method不用重写。)

AQS 预留给子类进行扩展的方法主要有下面几个:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

子类根据需要去实现即可。

以独占方式获取资源为例

AQS获取资源的流程:
a7f37bf6e341e74f6c60d06c511aa2ca.png

获取资源的入口是acquire(int arg)方法。arg是要获取的资源的个数,在独占模式下始终为1。我们先来看看这个方法的逻辑:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

首先调用tryAcquire(arg)尝试去获取资源。前面提到了这个方法是在子类具体实现的。

如果获取资源失败,就通过addWaiter(Node.EXCLUSIVE)方法把这个线程插入到等待队列中。其中传入的参数代表要插入的Node是独占式的。这个方法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// 将Node插入队列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS尝试,如果成功就返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS失败,再自旋CAS插入
enq(node);
return node;
}

// 自旋CAS插入等待队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

上面的两个函数比较好理解,就是在队列的尾部插入新的Node节点,但是需要注意的是由于AQS中会存在多个线程同时争夺资源的情况,因此肯定会出现多个线程同时插入节点的操作,在这里是通过CAS自旋的方式保证了操作的线程安全性。

现在回到最开始的aquire(int arg)方法。现在通过addWaiter方法,已经把一个Node放到等待队列尾部了。而处于等待队列的结点是从头结点一个一个去获取资源的。具体的实现我们来看看acquireQueued方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点,双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,shouldParkAfterFailedAcquire代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取头结点的节点状态
int ws = pred.waitStatus;
// 说明头结点处于唤醒状态, waitStatus为-1,表示线程已经准备好了,就等资源释放了。
if (ws == Node.SIGNAL)
return true;
// 通过枚举值我们知道waitStatus>0是取消状态
if (ws > 0) {
do {
// 循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前任节点等待状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

shouldParkAfterFailedAcquire流程如下:

资源释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
// 如果状态是负数,尝试把它设置为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 得到头结点的后继结点head.next
Node s = node.next;
// 如果这个后继结点为空或者状态大于0
// 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 等待队列中所有还有用的结点,都向前移动
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果后继结点不为空,
if (s != null)
LockSupport.unpark(s.thread);
}

通信工具类

我们常用到的通信工具类主要都是依靠借助AQS实现的(如semaphore、CountDownLatch、CyclicBarrier),实现的方式就是前面所说的根据需要去实现AQS预留的一些拓展方法:

AQS 预留给子类进行扩展的方法主要有下面几个:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

JDK中提供了一些工具类以供开发者使用。这样的话我们在遇到一些常见的应用场景时就可以使用这些工具类,而不用自己再重复造轮子了。

4d7f25b785c31cfd82f66d85270a98be.png

Semaphore(信号量)

功能概述

Java提供了经典信号量Semaphore的实现,它通过控制一定数量的许可(permit)的方式,来达到限制通用资源访问的目的。Semaphore往往用于资源有限的场景中,用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。例如:控制并发的线程数。

可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。

使用场景

比如:控制并发的线程数

比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。

比如:停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。

原理

Semaphore是通过一个计数器(记录许可证的数量)来实现的:
1.线程通过acquire()方法获取许可证(计数器的值减1),只有获取到许可证才可以继续执行下去,否则阻塞当前线程。
2.线程通过release()方法归还许可证(计数器的值加1)。
3.如果许可证减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。

常用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

//获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。
acquire()

//获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
acquire(int permits)

//获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
acquireUninterruptibly()


//尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
tryAcquire()


//尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
tryAcquire(long timeout, TimeUnit unit)


//释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
release()


//等待队列里是否还存在等待线程。
hasQueuedThreads()


//获取等待队列里阻塞的线程数。
getQueueLength()


//清空令牌把可用令牌数置为0,返回清空令牌的数量。
drainPermits()


//返回可用的令牌数量。
availablePermits()


案例

1、假如有一个落魄的停车场只能容纳3辆车。
2、当一辆车进入停车场后,显示牌的剩余车位数 -1.
3、每有一辆车驶出停车场后,显示牌的剩余车位 +1。
4、停车场剩余车位不足时,车辆只能在外面等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SemaphoreDemo {
static class MyThread implements Runnable {

private int value;
private Semaphore semaphore;

public MyThread(int value, Semaphore semaphore) {
this.value = value;
this.semaphore = semaphore;
}

@Override
public void run() {
try {
semaphore.acquire(); // 获取permit

System.out.println(String.format("车辆 粤A***%d 驶入, 还剩%d个车位,还有%d个车辆在等待",
value, semaphore.availablePermits(), semaphore.getQueueLength()));
// 睡眠随机时间,打乱释放顺序
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("车辆 粤A%d 已经驶出", value));
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
semaphore.release(); // 释放permit
}
}
}

public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(new MyThread(i, semaphore)).start();
}
}
}

运行结果:
7edc671804e6e24a7bdde6803dc0b005.png

结果分析:最开始是A3, A0, A4 这辆车获得了资源车位,而其它车辆进入了等待队列。然后当A0辆驶出释放车位后,在排队的车辆粤A***2才得以获取了刚释放的车位。

总结:Semaphore往往用于资源有限的场景中,去限制线程的数量。最主要的方法是acquire方法和release方法。acquire()方法会申请一个permit,而release方法会释放一个permit。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。

源码解析

构造函数

1
2
3
4
5
6
7
8
9
//非公平的构造函数
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

//通过fair参数决定公平性
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore有两个构造函数:

  • 参数permits表示许可数,它最后传递给了AQS的state值。线程在运行时首先获取许可,如果成功,许可数就减1,线程运行,当线程运行结束就释放许可,许可数就加1。如果许可数为0,则获取失败,线程位于AQS的等待队列中,它会被其它释放许可的线程唤醒。

  • fair,用于指定它的公平性。一般常用非公平的信号量,非公平信号量是指在获取许可时先尝试获取许可,而不必关心是否已有需要获取许可的线程位于等待队列中,如果获取失败,才会入列。而公平的信号量在获取许可时首先要查看等待队列中是否已有线程,如果有则入列排队。

acquire源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

可以看出,如果remaining <0 即获取许可后,许可数小于0,则获取失败,在doAcquireSharedInterruptibly方法中线程会将自身阻塞,然后入列。

release源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void release() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

上述代码看出释放许可就是将AQS中state的值加1。然后通过doReleaseShared唤醒等待队列的第一个节点。可以看出Semaphore使用的是AQS的共享模式,等待队列中的第一个节点,如果第一个节点成功获取许可,又会唤醒下一个节点,以此类推。

CountDownLatch(倒计数门闸锁)

功能概述

CountDownLatch是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行。

CountDownLatch这个类名字的意义。CountDown代表计数递减,Latch是“门闩”的意思。也有人把它称为“屏障”。而CountDownLatch这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。

使用场景

主任务开始之前的一些前期准备工作:
在游戏真正开始之前,一般会等待一些前置任务完成,比如“加载地图数据”,“加载人物模型”,“加载背景音乐”等等

倒数计时器:
比如:一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。 只有等所有检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。
它可以使得点火线程,等待所有检查线程全部完工后,再执行

原理

CountDownLatch是通过一个计数器来实现的,计数器的初始值为需要等待线程的数量。

  • 主线程调用CountDownLatch的await()方法会阻塞当前线程(即:主线程在闭锁上等待),直到计数器的值为0
  • 当一个工作线程完成了自己的任务后,调用CountDownLatch的countDown()方法,计数器的值就会减1。
  • 当计数器值为0时,说明所有的工作线程都执行完了,此时,在闭锁上等待的主线程就可以恢复执行任务。

案例

使用 CountDownLatch实现在游戏开始前的前置任务,任务完成后游戏自动开始,比如“加载地图数据”,“加载人物模型”,“加载背景音乐”等等。只有当所有的东西都加载完成后,游戏才开始

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class CountDownLatchDemo {

// 定义前置任务线程
static class PreTaskThread implements Runnable {

private String task;
private CountDownLatch countDownLatch;

public PreTaskThread(String task, CountDownLatch countDownLatch) {
this.task = task;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(task + " - 任务完成");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
// 假设有三个模块需要加载
CountDownLatch countDownLatch = new CountDownLatch(3);

// 主任务
new Thread(() -> {
try {
System.out.println("等待数据加载...");
System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
countDownLatch.await();
System.out.println("数据加载完成,正式开始游戏!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

// 前置任务
new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();
new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
}
}

运行结果:
2e7ccffac954ad808ddd0725b65bfaff.png

总结:CountDownLatch这个类的作用是,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。比如某个线程A等待若干个其他线程执行完任务之后,它才执行。

源码分析

常用方法:

1
2
3
4
5
6
7
// 构造方法:
public CountDownLatch(int count)

public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count

await方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); //doAcquireSharedInterruptibly 主要实现线程的入列与阻塞
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

countDown方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void countDown() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); //主要实现唤醒第一个节点,第一个节点有会唤醒第二个节点
return true;
}
return false;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

CyclicBarrier (可重用的倒计数栅栏)

功能概述

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
和CountDownLatch相似,也是等待某些线程都做完以后再执行。

与CountDownLatch区别是:
前面提到了CountDownLatch一旦计数值count被降为0后,就不能再重新设置了,它只能起一次“屏障”的作用。而CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()方法重置屏障,使计数器可以反复使用。比如,假设我们将计数器设置为10。那么凑齐第一批1 0个线程后,计数器就会归零,然后接着凑齐下一批10个线程。

使用场景

CyclicBarrier:一组线程互相等待,当它们都达到各自await()指定的barrier时,它们再同时继续执行各自下面的代码。

经典使用场景是公交发车,比如每辆公交车只要上满 4 个人就发车,后面来的人都会排队依次遵循相应的标准。

原理

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,在CyclicBarrier的内部定义了一个Lock对象(使用ReentrantLock来实现的),每当一个线程调用CyclicBarrier的await方法时,将剩余拦截的线程数减1,然后判断剩余拦截数是否为0,如果不是,进入Lock对象的条件队列等待。如果是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程会依次的获取锁、释放锁,接着先从await方法返回,再从CyclicBarrier的await方法中返回。

案例

案例一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class CyclicBarrierDemo2 {

static class TaskThread extends Thread {
CyclicBarrier barrier;

public TaskThread(CyclicBarrier barrier) {
this.barrier = barrier;
}

@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("选手"+getName() + " 到达栅栏 A 就位");
barrier.await();
System.out.println("选手"+getName() + " 走出栅栏 A");

Thread.sleep(2000);
System.out.println("选手"+getName() + " 到达栅栏 B");
barrier.await();
System.out.println("选手"+getName() + " 走出栅栏 B");
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
int threadNum = 3;
CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 人员就位到齐设备自动打开栅栏");
}
});

for(int i = 0; i < threadNum; i++) {
new TaskThread(barrier).start();
}
}
}

运行结果:
283e5359775239b91165d4c52187cc04.png

结果分析:
一开始选手到达栅栏A后会被阻塞,待选手都到齐后,栅栏A会自动打开,所有选手同时执行 barrier.await() 的动作,然后优先到达栅栏B后又会被阻塞,待选手都到达栅栏B后,栅栏B会自动打开,所有选手同时执行后面的动作

案例二:如果玩一个游戏有多个“关卡”,那使用CountDownLatch显然不太合适,那需要为每个关卡都创建一个实例。那我们可以使用CyclicBarrier来实现每个关卡的数据加载等待功能。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CyclicBarrierDemo {
static class PreTaskThread implements Runnable {

private String task;
private CyclicBarrier cyclicBarrier;

public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
this.task = task;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
// 假设总共三个关卡
for (int i = 1; i < 4; i++) {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d的任务%s完成", i, task));
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
cyclicBarrier.reset(); // 重置屏障
}
}
}

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("本关卡所有前置任务完成,开始游戏...");
});

new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
}
}

运行结果:
b24501a2fa80be6bc622f7485fc85dfa.png

前面提到了CountDownLatch一旦计数值count被降为0后,就不能再重新设置了,它只能起一次“屏障”的作用。而CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()方法重置屏障。

源码分析

构造函数:

1
2
3
4
5
6
public CyclicBarrier(int parties) {
this(parties, null);
}
public int getParties() {
return parties;
}

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction这个Runnable对象,方便处理更复杂的业务场景。

await源码:

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

dowait源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

当最后一个线程到达屏障点,也就是执行dowait方法时,会在return 0 返回之前调用finally块中的breakBarrier方法。

breakBarrier源代码:

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

CycliBarrier对象可以重复使用,重用之前应当调用CyclicBarrier对象的reset方法。
reset源码:

1
2
3
4
5
6
7
8
9
10
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

总结对比Semaphore、CountDownLatch、CyclicBarrier

  • Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。

  • CountDownLatch和CyclicBarrier都能够实现线程之间的等待。CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用。

  • CyclicBarrier和CoutDownLatch的底层实现也存在一点区别,CountDownLatch底层是直接通过组合一个继承了AQS的同步组件来实现的,而CyclicBarrier并没有直接借助AQS的同步组件,而是通过组合ReentrantLock这把锁来实现的(ReentrantLock的底层实现依然使用的AQS来实现的,归根结底,CyclicBarrier的底层实现也是AQS)。
    由于CyclicBarrier是使用ReentrantLock来实现的,因此它有个属性是lock。

参考资料

Semaphore 使用及原理
Java并发包中Semaphore的工作原理、源码分析及使用示例
CAS与原子操作
通信工具类
从ReentrantLock的实现看AQS的原理及应用