软件编程
位置:首页>> 软件编程>> java编程>> ReentrantLock源码详解--公平锁、非公平锁

ReentrantLock源码详解--公平锁、非公平锁

作者:彤哥读源码  发布时间:2023-04-22 17:18:56 

标签:ReentrantLock,源码,公平锁,非公平锁

问题

(1)重入锁是什么?

(2)ReentrantLock如何实现重入锁?

(3)ReentrantLock为什么默认是非公平模式?

(4)ReentrantLock除了可重入还有哪些特性?

简介

Reentrant = Re + entrant,Re是重复、又、再的意思,entrant是enter的名词或者形容词形式,翻译为进入者或者可进入的,所以Reentrant翻译为可重复进入的、可再次进入的,因此ReentrantLock翻译为重入锁或者再入锁。

重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁。

在Java中,除了ReentrantLock以外,synchronized也是重入锁。

那么,ReentrantLock的可重入性是怎么实现的呢?

继承体系

ReentrantLock源码详解--公平锁、非公平锁

ReentrantLock实现了Lock接口,Lock接口里面定义了java中锁应该实现的几个方法:


// 获取锁
void lock();
// 获取锁(可中断)
void lockInterruptibly() throws InterruptedException;
// 尝试获取锁,如果没获取到锁,就返回false
boolean tryLock();
// 尝试获取锁,如果没获取到锁,就等待一段时间,这段时间内还没获取到锁就返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 条件锁
Condition newCondition();

Lock接口中主要定义了 获取锁、尝试获取锁、释放锁、条件锁等几个方法。

源码分析

主要内部类

ReentrantLock中主要定义了三个内部类:Sync、NonfairSync、FairSync。


abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}

(1)抽象类Sync实现了AQS的部分方法;

(2)NonfairSync实现了Sync,主要用于非公平锁的获取;

(3)FairSync实现了Sync,主要用于公平锁的获取。

在这里我们先不急着看每个类具体的代码,等下面学习具体的功能点的时候再把所有方法串起来。

主要属性


private final Sync sync;

主要属性就一个sync,它在构造方法中初始化,决定使用公平锁还是非公平锁的方式获取锁。

主要构造方法


// 默认构造方法
public ReentrantLock() {
sync = new NonfairSync();
}
// 自己可选择使用公平锁还是非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

(1)默认构造方法使用的是非公平锁;

(2)第二个构造方法可以自己决定使用公平锁还是非公平锁;

上面我们分析了ReentrantLock的主要结构,下面我们跟着几个主要方法来看源码。

lock()方法

彤哥贴心地在每个方法的注释都加上方法的来源。

1.公平锁

这里我们假设ReentrantLock的实例是通过以下方式获得的:


ReentrantLock reentrantLock = new ReentrantLock(true);

下面的是加锁的主要逻辑:


// ReentrantLock.lock()
public void lock() {
// 调用的sync属性的lock()方法
// 这里的sync是公平锁,所以是FairSync的实例
sync.lock();
}
// ReentrantLock.FairSync.lock()
final void lock() {
// 调用AQS的acquire()方法获取锁
// 注意,这里传的值为1
acquire(1);
}
// AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
// 尝试获取锁
// 如果失败了,就排队
if (!tryAcquire(arg) &&
// 注意addWaiter()这里传入的节点模式为独占模式
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ReentrantLock.FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// 查看当前状态变量的值
int c = getState();
// 如果状态变量的值为0,说明暂时还没有人占有锁
if (c == 0) {
// 如果没有其它线程在排队,那么当前线程尝试更新state的值为1
// 如果成功了,则说明当前线程获取了锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 当前线程获取了锁,把自己设置到exclusiveOwnerThread变量中
// exclusiveOwnerThread是AQS的父类AbstractOwnableSynchronizer中提供的变量
setExclusiveOwnerThread(current);
// 返回true说明成功获取了锁
return true;
}
}
// 如果当前线程本身就占有着锁,现在又尝试获取锁
// 那么,直接让它获取锁并返回true
else if (current == getExclusiveOwnerThread()) {
// 状态变量state的值加1
int nextc = c + acquires;
// 如果溢出了,则报错
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 设置到state中
// 这里不需要CAS更新state
// 因为当前线程占有着锁,其它线程只会CAS把state从0更新成1,是不会成功的
// 所以不存在竞争,自然不需要使用CAS来更新
setState(nextc);
// 当线程获取锁成功
return true;
}
// 当前线程尝试获取锁失败
return false;
}
// AbstractQueuedSynchronizer.addWaiter()
// 调用这个方法,说明上面尝试获取锁失败了
private Node addWaiter(Node mode) {
// 新建一个节点
Node node = new Node(Thread.currentThread(), mode);
// 这里先尝试把新节点加到尾节点后面
// 如果成功了就返回新节点
// 如果没成功再调用enq()方法不断尝试
Node pred = tail;
// 如果尾节点不为空
if (pred != null) {
// 设置新节点的前置节点为现在的尾节点
node.prev = pred;
// CAS更新尾节点为新节点
if (compareAndSetTail(pred, node)) {
// 如果成功了,把旧尾节点的下一个节点指向新节点
pred.next = node;
// 并返回新节点
return node;
}
}
// 如果上面尝试入队新节点没成功,调用enq()处理
enq(node);
return node;
}
// AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
// 自旋,不断尝试
for (;;) {
Node t = tail;
// 如果尾节点为空,说明还未初始化
if (t == null) { // Must initialize
// 初始化头节点和尾节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果尾节点不为空
// 设置新节点的前一个节点为现在的尾节点
node.prev = t;
// CAS更新尾节点为新节点
if (compareAndSetTail(t, node)) {
// 成功了,则设置旧尾节点的下一个节点为新节点
t.next = node;
// 并返回旧尾节点
return t;
}
}
}
}
// AbstractQueuedSynchronizer.acquireQueued()
// 调用上面的addWaiter()方法使得新节点已经成功入队了
// 这个方法是尝试让当前节点来获取锁的
final boolean acquireQueued(final Node node, int arg) {
// 失败标记
boolean failed = true;
try {
// 中断标记
boolean interrupted = false;
// 自旋
for (;;) {
// 当前节点的前一个节点
final Node p = node.predecessor();
// 如果当前节点的前一个节点为head节点,则说明轮到自己获取锁了
// 调用ReentrantLock.FairSync.tryAcquire()方法再次尝试获取锁
if (p == head && tryAcquire(arg)) {
// 尝试获取锁成功
// 这里同时只会有一个线程在执行,所以不需要用CAS更新
// 把当前节点设置为新的头节点
setHead(node);
// 并把上一个节点从链表中删除
p.next = null; // help GC
// 未失败
failed = false;
return interrupted;
}
// 是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 真正阻塞的方法
parkAndCheckInterrupt())
// 如果中断了
interrupted = true;
}
} finally {
// 如果失败了
if (failed)
// 取消获取锁
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
// 这个方法是在上面的for()循环里面调用的
// 第一次调用会把前一个节点的等待状态设置为SIGNAL,并返回false
// 第二次调用才会返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 上一个节点的等待状态
// 注意Node的waitStatus字段我们在上面创建Node的时候并没有指定
// 也就是说使用的是默认值0
// 这里把各种等待状态再贴出来
//static final int CANCELLED = 1;
//static final int SIGNAL = -1;
//static final int CONDITION = -2;
//static final int PROPAGATE = -3;
int ws = pred.waitStatus;
// 如果等待状态为SIGNAL(等待唤醒),直接返回true
if (ws == Node.SIGNAL)
return true;
// 如果前一个节点的状态大于0,也就是已取消状态
if (ws > 0) {
// 把前面所有取消状态的节点都从链表中删除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前一个节点的状态小于等于0,则把其状态设置为等待唤醒
// 这里可以简单地理解为把初始状态0设置为SIGNAL
// CONDITION是条件锁的时候使用的
// PROPAGATE是共享锁使用的
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
// 底层调用的是Unsafe的park()方法
LockSupport.park(this);
// 返回是否已中断
return Thread.interrupted();
}

下面我们看一下主要方法的调用关系,可以跟着我的 → 层级在脑海中大概过一遍每个方法的主要代码:


ReentrantLock#lock()
->ReentrantLock.FairSync#lock() // 公平模式获取锁
->AbstractQueuedSynchronizer#acquire() // AQS的获取锁方法
->ReentrantLock.FairSync#tryAcquire() // 尝试获取锁
->AbstractQueuedSynchronizer#addWaiter() // 添加到队列
->AbstractQueuedSynchronizer#enq() // 入队
->AbstractQueuedSynchronizer#acquireQueued() // 里面有个for()循环,唤醒后再次尝试获取锁
->AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() // 检查是否要阻塞
->AbstractQueuedSynchronizer#parkAndCheckInterrupt() // 真正阻塞的地方

获取锁的主要过程大致如下:

(1)尝试获取锁,如果获取到了就直接返回了;

(2)尝试获取锁失败,再调用addWaiter()构建新节点并把新节点入队;

(3)然后调用acquireQueued()再次尝试获取锁,如果成功了,直接返回;

(4)如果再次失败,再调用shouldParkAfterFailedAcquire()将节点的等待状态置为等待唤醒(SIGNAL);

(5)调用parkAndCheckInterrupt()阻塞当前线程;

(6)如果被唤醒了,会继续在acquireQueued()的for()循环再次尝试获取锁,如果成功了就返回;

(7)如果不成功,再次阻塞,重复(3)(4)(5)直到成功获取到锁。

来源:https://www.cnblogs.com/tong-yuan/p/ReentrantLock.html

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com