0%

深入浅出Java多线程-ReentrantLock

线程安全的实现方法概述

线程安全的实现方法,可以分为 阻塞同步,非阻塞同步,无同步方案三大类:

58567e99523dabb30de128bae6916038.png

其中阻塞同步最基本的两个实现手段就是通过 synchronized 或 ReentrantLock 关键字。这篇文章主要记录下 RentrantLock 的源码实现。

ReentrantLock 是基于 抽象类 AbstractQueuedSynchronizer 实现的,AQS内部使用了一个volatile的变量state 来作为资源的标识,并提供了一些模版方法,ReentrantLock 通过实现 AQS 的 tryAcquire 实现获取锁的逻辑。

ReentrantLock 的锁类型

ReentrantLock 分为公平锁和非公平锁,可以通过构造方法来指定具体类型:

1
2
3
4
5
6
7
8
//默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock 使用示例模版

通常的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
private ReentrantLock lock = new ReentrantLock();
public void run() {
lock.lock();
try {
//do bussiness
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

ReentrantLock 的源码实现

公平锁的实现

1
2
3
4
5
6
7
public void lock() {
sync.lock();
}

final void lock() {
acquire(1);
}

可以看到,ReentrantLock 的lock 方法,其实是通过调用 AQS 提供的模版方法 acquire 来获取独占锁的。

AbstractQueuedSynchronizer 的模版方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
//通过  中的 acquire() 获取独占资源
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


//尝试获取锁的细节,留给子类去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

获取锁

ReentrantLock 的 tryAcquire实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//判断 AQS 中的 state 是否等于 0,0 表示目前没有其他线程获得锁,当前线程就可以尝试获取锁。
if (c == 0) {
//如果AQS的队列中没有其他线程,则尝试获取锁(公平锁才有判断这个)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果statc 不等于0,即锁以及被获取了,这里则进行释放重入的判断
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

可以看到在 ReentrantLock 中判断锁是否被获取,是通过 state 来判断的, state =0 则表示目前没有其他线程获得锁,当前线程就可以尝试获取锁;state 大于0 则表示锁已经被占用,则判断占用锁的线程是否就是当前线程,如果是当前线程,则 state +1, 表示重入数 +1。

注意:尝试之前会利用 hasQueuedPredecessors() 方法来判断 AQS 的队列中是否有其他线程,如果有则不会尝试获取锁(获取公平锁才有这个判断)。

如果队列中没有线程就利用 CAS 来将 AQS 中的 state 修改为1,也就是获取锁,获取成功则将当前线程置为获得锁的独占线程(setExclusiveOwnerThread(current))。

获取锁失败则添加到队列中

如果 tryAcquire(arg) 获取锁失败,则需要用 addWaiter(Node.EXCLUSIVE) 将当前线程包装为一个Node节点写入队列中(AQS 中的双端队列是由 Node 节点组成的双向链表实现的)。其中传入的参数代表要插入的Node是独占式的。

入队的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// 将Node插入队列中
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS尝试,如果成功就返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS失败,再自旋CAS插入
enq(node);
return node;
}

// 自旋CAS插入等待队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾,如果出现并发写入失败就需要调用 enq(node); 来写入了。在enq中通过 自旋加上 CAS 来保证写入队列成功。

挂起等待线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点,双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取锁成功后将。node 节点设置成头节点。

如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理(shouldParkAfterFailedAcquire(p, node))。

waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。

shouldParkAfterFailedAcquire(p, node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt():

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

非公平锁的实现

公平锁和非公平锁的主要差异在于实现上:公平锁会先判断队列中是否已经有在排队的线程,如果有则自觉加入到队列尾排队,而非公平锁则不会有此顾忌,非公平锁的实现上是抢占模式的,线程一进来就会先尝试获取锁,不需要考虑先来后到之类的规则。

非公平锁与公平锁获取的差异:

1
2
3
4
5
6
7
8
9
10
11
12
13
//非公平锁
final void lock() {
//直接尝试获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

//公平锁
final void lock() {
acquire(1);
}

还要一个重要的区别是在尝试获取锁时tryAcquire(arg),非公平锁是不需要判断队列中是否还有其他线程,也是直接尝试获取锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//没有 !hasQueuedPredecessors() 判断
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

释放锁

公平锁和非公平锁的释放流程都是一样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒被挂起的线程
unparkSuccessor(h);
return true;
}
return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

首先会判断当前线程是否为获得锁的线程,由于是重入锁所以需要将 state 减到 0 才认为完全释放锁。释放之后需要调用 unparkSuccessor(h) 来唤醒被挂起的线程。

公平锁与非公平锁小结

非公平锁的效率会被公平锁更高。因为公平锁需要关心队列的情况,得按照队列里的先后顺序来获取锁(会造成大量的线程上下文切换),而非公平锁的实现上是抢占模式的,线程一进来就会先尝试获取锁,没有先来后到的规则限制。

ReentrantLock 与 synchronized 的一个对比(怎么选择)

synchronized 与 ReentrantLock 都是可重入的,但相比于 synchronized,ReentrantLock增加了一些高级的功能,主要有以下三项:等待可中断,可实现公平锁,以及锁可以绑定多个条件。

  • 等待可中断是指当持有锁的线程长期不释放锁的时候,正在等待的线程可以放弃等待,该为处理其他事情,可中断特性对于处理执行时间非常长的同步块很有帮助。

  • 公平锁是指多个线程等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁;而非公平锁则不保证这一点,在锁被释放时,任何一个等待锁的线程都有机会获得锁。synchronized中的锁上非公平的,ReentrantLock默认情况下也是非公平的,但可以通过带布尔值的构造函数要求使用公平锁。

  • 锁绑定多个条件是指 ReentrantLock 对象可以同时绑定多个 Condition 对象,而在 synchronized 中,锁对象的 wait()、notify() 和 notifyAll() 方法只可以实现一个隐含的条件,如果要和多余一个的条件关联的时候,就不得不额外地添加一个锁,而ReentrantLock则可以通过多次调用 newCondition() 方法实现。

通过互斥同步来实现线程安全时,如果需要用到上述功能,则选用 ReentrantLock。如果不需要上面的功能则选用synchronized。因为性能方面,JDK1.6 发布后,synchronized 与 ReentrantLock 的性能方面已经基本上没什么差异了,并且JVM 更倾向于优化改进更偏向于原生的 synchronized。

HotSpot 虚拟机团队在 JDK1.6 开始,已对synchronized 进行了多种锁优化,比如自适应自旋、锁消除、锁粗化、轻量级锁、和偏向锁等。这些技术都是为了在线程之间优先以更高效的方式决绝竞争问题,从而提供程序的执行效率。

参考文献

  1. 《深入理解Java虚拟机》
  2. ReentrantLock 实现原理
  3. 从ReentrantLock的实现看AQS的原理及应用