详解Java 信号量Semaphore
作者:java小新人 发布时间:2021-12-22 11:10:36
Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数;
一.简单使用
同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行:
package com.example.demo.study;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Study0217 {
//创建一个信号量的实例,信号量初始值为0
static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(()->{
System.out.println("Thread1---start");
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println("Thread2---start");
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println("Thread3---start");
//信号量加一
semaphore.release();
});
//等待两个子线程执行完毕就放过,必须要信号量等于2才放过
semaphore.acquire(2);
System.out.println("两个子线程执行完毕");
//关闭线程池,正在执行的任务继续执行
pool.shutdown();
}
}
这个信号量也可以复用,类似CyclicBarrier:
package com.example.demo.study;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Study0217 {
//创建一个信号量的实例,信号量初始值为0
static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(()->{
System.out.println("Thread1---start");
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println("Thread2---start");
//信号量加一
semaphore.release();
});
//等待两个子线程执行完毕就放过,必须要信号量等于2才放过
semaphore.acquire(2);
System.out.println("子线程1,2执行完毕");
pool.submit(()->{
System.out.println("Thread3---start");
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println("Thread4---start");
//信号量加一
semaphore.release();
});
semaphore.acquire(2);
System.out.println("子线程3,4执行完毕");
//关闭线程池,正在执行的任务继续执行
pool.shutdown();
}
}
二.信号量原理
看看下面这个图,可以知道信号量Semaphore还是根据AQS实现的,内部有个Sync工具类操作AQS,还分为公平策略和非公平策略;
构造器:
//默认是非公平策略
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//可以根据第二个参数选择是公平策略还是非公平策略
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire(int permits)方法:
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
//AQS中的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
//这里根据子类是公平策略还是非公平策略
if (tryAcquireShared(arg) < 0)
//获取失败会进入这里,将线程放入阻塞队列,然后再尝试,还是失败的话就调用park方法挂起当前线程
doAcquireSharedInterruptibly(arg);
}
//非公平策略
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
//一个无限循环,获取state剩余的信号量,因为每调用一次release()方法的话,信号量就会加一,这里将
//最新的信号量减去传进来的参数比较,比如有两个线程,其中一个线程已经调用了release方法,然后调用acquire(2)方法,那么
//这里remaining的值就是-1,返回-1,然后当前线程就会被丢到阻塞队列中去了;如果另外一个线程也调用了release方法,
//那么此时的remaining==0,所以在这里的if中会调用CAS将0设置到state
//
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
//公平策略
//和上面非公平差不多,只不过这里会查看阻塞队列中当前节点前面有没有前驱节点,有的话直接返回-1,
//就会把当前线程丢到阻塞队列中阻塞去了,没有前驱节点的话,就跟非公平模式一样的了
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||compareAndSetState(available, remaining))
return remaining;
}
}
再看看release(int permits)方法:
//这个方法的作用就是将信号量加一
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
//AQS中方法
public final boolean releaseShared(int arg) {
//tryReleaseShared尝试释放资源
if (tryReleaseShared(arg)) {
//释放资源成功就调用park方法唤醒唤醒AQS队列中最前面的节点中的线程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
//一个无限循环,获取state,然后加上传进去的参数,如果新的state的值小于旧的state,说明已经超过了state的最大值,溢出了
//没有溢出的话,就用CAS更新state的值
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//ws==Node.SIGNAL表示节点中线程需要被唤醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//调用阻塞队列中线程的unpark方法唤醒线程
unparkSuccessor(h);
}
//ws == 0表示节点中线程是初始状态
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
以最上面的例子简单说一下,其实不是很难,首先线程1和线程2分别去调用release方法,这个方法里面会将AQS中的state加一,但是在执行这个操作之前,主线程肯定会先到acquire(2),在这个方法里面,假如默认使用非公平策略,首先获取当前的信号量state(state的初始值是0),用当前信号量减去2,如果小于0,那么当前主线程就会丢到AQS队列中阻塞;
这个时候线程1的release方法执行了,于是就把信号量state加一(此时state==1),CAS更新state为一,成功的话,就调用doReleaseShared()方法唤醒AQS阻塞队列中最先挂起的线程(这里就是因为调用acquire方法而阻塞的主线程),主线程唤醒之后又会去获取最新的信号量,与2比较,发现还是小于0,于是又会阻塞;
线程2此时的release方法执行完成,重复线程一的操作,主线程唤醒之后(此时state==2),又去获取最新的信号量发现是2,减去acquire方法的参数2等于0,于是就用CAS更新state的值,然后acquire方法也就执行完毕,主线程继续执行后面的代码;
其实信号量还是很有意思的,记得在项目里,有人利用信号量实现了一个故障隔离,什么时候我可以把整理之后的代码贴出来分享一下,还是很有意思的,就跟springcloud的熔断机制差不多,场景是:比如你在service的一个方法调用第三方的接口,你不知道调不调得通,而且你不希望每次前端过来都会去调用,比如当调用失败的次数超过100次,那么五分钟之后才会再去实际调用这个第三方服务!这五分钟内前调用这个服务,就会触发我们这个故障隔离的机制,向前端返回一个特定的错误码和错误信息!
来源:https://www.cnblogs.com/wyq1995/p/12319707.html


猜你喜欢
- 本文实例为大家分享了Java实现FTP上传与下载的具体代码,供大家参考,具体内容如下JAVA操作FTP服务器,只需要创建一个FTPClien
- 谷歌有专门的SDK来完成VR,我这次以一个全景图片的例子来说一下这个SDK实现VR的基本过程,首先全景图片就是百度地图里的那样,能够看到周围
- Maven打包时指定启动类使用Maven打包的时候, 有时候需要指定启动类, 可如下操作!测试项目(结构如下):代码: com.xxx.Ma
- 栈和队列的本质是相同的,都只能在线性表的一端进行插入和删除。因此,栈和队列可以相互转换。用栈实现队列—力扣232题题目要求:仅使用两个栈实现
- 1.Fanout Exchange介绍Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队
- NavMesh(导航网格)是3D游戏世界中用于动态物体实现自动寻路的技术。NavMesh系统是人工智能的一种,它使用一个添加在游戏对象上或者
- 线程组我们可以把线程归属到某个线程组中,线程组可以包含多个线程以及线程组,线程和线程组组成了父子关系,是个树形结构,如下图:使用线程组可以方
- 目录1.Groovy特性2.核心涉及3.Java与Groovy转换第一步:引入Groovy依赖第二步:创建interface接口声明方法第三
- 前言Spring 5发布有两年了,随Spring 5一起发布了一个和Spring WebMvc同级的Spring WebFlux。这是一个支
- SessionFactory在Hibernate中实际上起到了一个缓冲区的作用 他缓冲了HI
- android RecyclerView不像过去的ListView那样随意的设置水平方向的分割线,如果要实现RecyclerView的水平/
- 一、快捷键添加代码块:++快速生成属性等:++导包:+++自动创建变量名:++查找源代码:++按条件查找替换:++快速查看当前类的所有方法:
- 1、什么是反射?在java开发中有一个非常重要的概念就是java反射机制,也是java的重要特征之一。反射的概念是由Smith在1982年首
- 内容简介本篇将介绍 Flutter 中如何完成图片上传,以及上传成功后的表单提交。涉及的知识点如下:图片选择插件wechat_assets_
- 在实际项目中,在处理较大的文件时,常常将文件拆分为多个子文件进行处理,最后再合并这些子文件。下面就为各位介绍下Java中合并多个文件的方法。
- 方式一: 配置文件 application.propertiesserver.port=7788方式二: java启动命令# 以应用参数的方
- AnyChat(全名叫Anychat SDK),也叫音视频互动开发平台;是一套跨平台的即时通讯解决方案,基于先进的H.264视频编码标准、A
- 相信对于一名JAVA开发者开说properties文件一定再熟悉不过了,比如一下配置:config.properties会经常存放一些系统常
- 一、Mybatis简介Mybatis是一款超级无敌的持久层框架,它支持自定义SQL、存储过程以及高级映射。Mybatis可以通过简单的XML
- Java(和其他语言)通过内部类支持嵌套类。要使其正常工作,需要编译器执行一些技巧。这是一个例子:public class Outer {