软件编程
位置:首页>> 软件编程>> java编程>> java Semaphore共享锁实现原理解析

java Semaphore共享锁实现原理解析

作者:小海编码日记  发布时间:2021-11-02 23:12:38 

标签:java,Semaphore,共享锁,锁

在线程间通信方式中,我们了解到可以使用Semaphore信号量来实现线程间通信,Semaphore支持公平锁和非公平锁,Semaphore底层是通过共享锁来实现的,其支持两种构造函数,如下所示:

// 默认使用非公平锁实现
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore提供的常用函数如下所示:

函数名说明备注
acquire获取锁/
release释放锁/

下面我们来看下Semaphore内部的实现原理

Semaphore内部类及继承关系

java Semaphore共享锁实现原理解析

可以看出Semaphore和ReentrantLock实现原理基本一致,包含NonfairSync和FairSync两个内部类,这两个内部类的父类均为AQS,不妨大胆猜测Semaphore也是依赖AQS实现的,接下来我们一起来看下Semaphore获取和释放锁的流程。

Semaphore.acquire流程分析(以非公平锁为例)

java Semaphore共享锁实现原理解析

从上图可以看出,针对阻塞线程的部分实现,和ReentrantLock基本一致,我们不做赘述,主要来看下前半部分的源码实现:

// Semaphore.java
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 如果线程是中断状态,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取共享资源
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

从源码可以看出acquire主要依赖于tryAcquireShared和doAcquireSharedInterruptibly,接下来我们来分别看下这两块的代码

tryAcquireShared

// NonfairSync
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
// Sync
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
// AbstractQueuedSynchronizer.java
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

从代码可以看出这里主要是根据申请的许可证数量,比较时否有许可证数量,如果可用许可证数量小于0,则直接返回,如果大于0,则通过CAS将state设置为可用许可证数量。

doAcquireSharedInterruptibly

当tryAcquireShared中返回的可用许可证数量小于0时,执行doAcquireSharedInterruptibly流程,代码如下:

// AbstractQueuedSynchronizer.java
// 在队尾新建Node对象并添加
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
// AbstractQueuedSynchronizer.java
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 将当前线程添加到等待队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // for循环自旋
        for (;;) {
            // 获取node的前一个节点
            final Node p = node.predecessor();
            // 如果前一个节点是头节点
            if (p == head) {
                // 尝试获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 获取锁成功,更新node信息设置为头节点,并通知其他节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 判断是否需要阻塞线程,设置waitStatus并阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private void setHeadAndPropagate(Node node, int propagate) {
   Node h = head; // Record old head for check below
   setHead(node);
   if (propagate > 0 || h == null || h.waitStatus < 0 ||
       (h = head) == null || h.waitStatus < 0) {
       Node s = node.next;
       if (s == null || s.isShared())
           doReleaseShared();
   }
}

执行setHeadAndPropagate的主要目的在于,这里能获取到说明在该线程自旋过程中有线程释放了许可证,释放的许可证数量有可能还有剩余,所以传递给其他节点的线程,唤醒其他阻塞状态的线程也尝试去获取许可证。

Semaphore.release流程分析(以非公平锁为例)

java Semaphore共享锁实现原理解析

Semaphore.release流程相对而言,就比较简单,将release传递到AQS内部通过CAS更新许可证数量信息,更新完成后,遍历队列中Node节点,将Node waitStatus设置为0,并对对应线程执行unpark,相关代码如下:

protected final boolean tryReleaseShared(int releases) {
   for (;;) {
       int current = getState();
       int next = current + releases;
       if (next < current) // overflow
           throw new Error("Maximum permit count exceeded");
       // 通过CAS更新许可证数量
       if (compareAndSetState(current, next))
           return true;
   }
}
private void unparkSuccessor(Node node) {
   int ws = node.waitStatus;
   if (ws < 0)
       compareAndSetWaitStatus(node, ws, 0);
   Node s = node.next;
   if (s == null || s.waitStatus > 0) {
       s = null;
       for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0)
               s = t;
   }
   if (s != null)
       LockSupport.unpark(s.thread);
}
// 许可证数量更新完成后,调用该方法唤醒线程
private void doReleaseShared() {
   // 自旋
   for (;;) {
       Node h = head;
       if (h != null && h != tail) {
           int ws = h.waitStatus;
           if (ws == Node.SIGNAL) {
               if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                   continue;            // loop to recheck cases
               // 唤醒后继节点线程抢占许可证
               unparkSuccessor(h);
           }
           else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
               continue;                // loop on failed CAS
       }
       if (h == head)                   // loop if head changed
           break;
   }
}

综上,我们分析了Smaphore非公平锁的实现,感兴趣的可以分析下公平锁的实现,其本质区别在于在tryAcquireShared中只有当等待队列为空时,才会去尝试更新剩余许可证数量

来源:https://juejin.cn/post/7181724606389026876

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com