解决线程池中ThreadGroup的坑
作者:kevin-go 发布时间:2023-08-24 00:13:47
线程池中ThreadGroup的坑
在Java中每一个线程都归属于某个线程组管理的一员,例如在主函数main()主工作流程中产生一个线程,则产生的线程属于main这个线程组管理的一员。简单地说,线程组(ThreadGroup)就是由线程组成的管理线程的类,这个类是java.lang.ThreadGroup类。
定义一个线程组,通过以下代码可以实现。
ThreadGroup group=new ThreadGroup(“groupName”);
Thread thread=new Thread(group,”the first thread of group”);
ThreadGroup类中的某些方法,可以对线程组中的线程产生作用。例如,setMaxPriority()方法可以设定线程组中的所有线程拥有最大的优先权。
所有线程都隶属于一个线程组。那可以是一个默认线程组(不指定group),亦可是一个创建线程时明确指定的组。在创建之初,线程被限制到一个组里,而且不能改变到一个不同的组。每个应用都至少有一个线程从属于系统线程组。若创建多个线程而不指定一个组,它们就会自动归属于系统线程组。
线程组也必须从属于其他线程组。必须在构建器里指定新线程组从属于哪个线程组。若在创建一个线程组的时候没有指定它的归属,则同样会自动成为系统线程组的一名属下。因此,一个应用程序中的所有线程组最终都会将系统线程组作为自己的“父”。
那么假如我们需要在线程池中实现一个带自定义ThreadGroup的线程分组,该怎么实现呢?
我们在给线程池(ThreadPoolExecutor)提交任务的时候可以通过execute(Runnable command)来将一个线程任务加入到该线程池,那么我们是否可以通过new一个指定了ThreadGroup的Thread实例来加入线程池来达到前面说到的目的呢?
ThreadGroup是否可行
通过new Thread(threadGroup,runnable)实现线程池中任务分组
public static void main(String[] args) {
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
final ThreadGroup group = new ThreadGroup("Main_Test_Group");
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(group, new Runnable() {
@Override
public void run() {
int sleep = (int)(Math.random() * 10);
try {
Thread.sleep(1000 * 3);
System.out.println(Thread.currentThread().getName()+"执行完毕");
System.out.println("当前线程组中的运行线程数"+group.activeCount());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, group.getName()+" #"+i+"");
pool.execute(thread);
}
}
运行结果
pool-1-thread-3执行完毕
pool-1-thread-1执行完毕
当前线程组中的运行线程数0
pool-1-thread-2执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0
pool-1-thread-4执行完毕
pool-1-thread-5执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0
运行结果中可以看到group中的线程并没有因为线程池启动了这个线程任务而运行起来.因此通过线程组来对线程池中的线层任务分组不可行.
从java.util.concurrent.ThreadPoolExecutor源码中可以看到如下构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
如果我们在实例化ThreadPoolExecutor时不指定ThreadFactory,那么将以默认的ThreadFactory来创建Thread.
Executors内部类DefaultThreadFactory
下面的源码即是默认的Thread工厂
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
从唯一的构造函数可以看到DefaultThreadFactory以SecurityManager 实例中的ThreadGroup来指定线程的group,如果SecurityManager 获取到的ThreadGroup为null才默认以当前线程的group来指定.public Thread newThread(Runnable r) 则以group来new 一个Thead.这样我们可以在实例化ThreadPoolExecutor对象的时候在其构造函数内传入自定义的ThreadFactory实例即可达到目的.
public class MyTheadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private ThreadGroup defaultGroup;
public MyTheadFactory() {
SecurityManager s = System.getSecurityManager();
defaultGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public MyTheadFactory(ThreadGroup group) {
this.defaultGroup = group;
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(defaultGroup, null, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
ThreadGroup的使用及手写线程池
监听线程异常关闭
以下代码在window下不方便测试,需在linux 上 测试
// 以下线程如果强制关闭的话,是无法打印`线程被杀掉了`
// 模拟关闭 kill PID
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread( () -> {
System.out.println("线程被杀掉了");
}));
while(true){
System.out.println("i am working ...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如何拿到Thread线程中异常
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
int i = 10/0;
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.setUncaughtExceptionHandler((t,e)->{
System.out.println("线程的名字"+ t.getName());
System.out.println(e);
}); // 通过注入接口的方式
thread.start();
}
ThreadGroup
注意: threadGroup 设置为isDaemon 后,会随最后一个线程结束而销毁,如果没有设置isDaemon ,则需要手动调用 destory()
线程池使用
自己搭建的简单线程池实现
其中ThreadGroup 的应用没有写,但是我们可以观察线程关闭后,检查ThreadGroup 中是否还有活跃的线程等,具体参考ThreadGroup API
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;
/**
* @Author: shengjm
* @Date: 2020/2/10 9:52
* @Description:
*/
public class SimpleThreadPool extends Thread{
/**
* 线程数量
*/
private int size;
private final int queueSize;
/**
* 默认线程队列数量
*/
private final static int DEFAULR_TASK_QUEUE_SIZE = 2000;
private static volatile int seq = 0;
private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_";
private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
private final DiscardPolicy discardPolicy;
private volatile boolean destory = false;
private int min;
private int max;
private int active;
/**
* 定义异常策略的实现
*/
private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
throw new DiscardException("线程池已经被撑爆了,后继多余的人将丢失");
};
/**
*
*/
public SimpleThreadPool(){
this(4,8,12,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY);
}
/**
*
*/
public SimpleThreadPool(int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) {
this.min = min;
this.active = active;
this.max = max;
this.queueSize = queueSize;
this.discardPolicy = discardPolicy;
init();
}
/**
* 初始化
*/
private void init() {
for(int i = 0; i < min; i++){
createWorkTask();
}
this.size = min;
this.start();
}
private void createWorkTask(){
WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));
task.start();
THREAD_QUEUE.add(task);
}
/**
* 线程池自动扩充
*/
@Override
public void run() {
while(!destory){
System.out.println(this.min +" --- "+this.active+" --- "+this.max + " --- "+ this.size + " --- "+ TASK_QUEUE.size());
try {
Thread.sleep(1000);
if(TASK_QUEUE.size() > active && size < active){
for (int i = size; i < active;i++){
createWorkTask();
}
size = active;
}else if(TASK_QUEUE.size() > max && size < max){
for (int i = size; i < max;i++){
createWorkTask();
}
size = max;
}
synchronized (THREAD_QUEUE){
if(TASK_QUEUE.isEmpty() && size > active){
int release = size - active;
for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){
if(release <=0){
break;
}
WorkerTask task = it.next();
task.close();
task.interrupt();
it.remove();
release--;
}
size = active;
}
}
} catch (InterruptedException e) {
break;
}
}
}
public void submit(Runnable runnable){
synchronized (TASK_QUEUE){
if(destory){
throw new DiscardException("线程池已经被摧毁了...");
}
if(TASK_QUEUE.size() > queueSize){
discardPolicy.discard();
}
TASK_QUEUE.addLast(runnable);
TASK_QUEUE.notifyAll();
}
}
/**
* 关闭
*/
public void shutdown(){
while(!TASK_QUEUE.isEmpty()){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized (THREAD_QUEUE) {
int initVal = THREAD_QUEUE.size();
while (initVal > 0) {
for (WorkerTask workerTask : THREAD_QUEUE) {
if (workerTask.getTaskState() == TaskState.BLOCKED) {
workerTask.interrupt();
workerTask.close();
initVal--;
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
this.destory = true;
}
}
public int getSize() {
return size;
}
public int getMin() {
return min;
}
public int getMax() {
return max;
}
public int getActive() {
return active;
}
/**
* 线程状态
*/
private enum TaskState{
FREE , RUNNING , BLOCKED , DEAD
}
/**
* 自定义异常类
*/
public static class DiscardException extends RuntimeException{
public DiscardException(String message){
super(message);
}
}
/**
* 定义异常策略
*/
@FunctionalInterface
public interface DiscardPolicy{
void discard() throws DiscardException;
}
private static class WorkerTask extends Thread{
private volatile TaskState taskState = TaskState.FREE;
public TaskState getTaskState(){
return this.taskState;
}
public WorkerTask(ThreadGroup group , String name){
super(group , name);
}
@Override
public void run(){
OUTER:
while(this.taskState != TaskState.DEAD){
Runnable runnable;
synchronized (TASK_QUEUE){
while(TASK_QUEUE.isEmpty()){
try {
taskState = TaskState.BLOCKED;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
break OUTER;
}
}
runnable = TASK_QUEUE.removeFirst();
}
if(runnable != null){
taskState = TaskState.RUNNING;
runnable.run();
taskState = TaskState.FREE;
}
}
}
public void close(){
this.taskState = TaskState.DEAD;
}
}
/**
* 测试
* @param args
*/
public static void main(String[] args) {
SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
// SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY);
IntStream.rangeClosed(0,40).forEach(i -> {
simpleThreadPool.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("the runnable " + i + "be servered by " + Thread.currentThread());
});
});
// try {
// Thread.sleep(15000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
simpleThreadPool.shutdown();
}
}
来源:https://blog.csdn.net/tyBaoErGe/article/details/50196379


猜你喜欢
- 一、安装Maven下载地址:https://maven.apache.org/download.cgi把下载的安装包解压tar -xvf a
- @JSONField看源码它可以作用于字段和方法上。引用网上说的,一、作用Field@JSONField作用在Field时,其name不仅定
- 本文实例讲述了C#实现翻转字符串的方法。分享给大家供大家参考。具体实现方法如下:Func<string, string> Rev
- 目录前言项目环境1.线程池示例2.shutdown3.isShutdown4.isTerminated5.awaitTermination6
- 对象的初始化和清理生活中我们买的电子产品都基本会有出厂设置,在某一天我们不用时候也会删除一些自己信息数据保证安全。C++中的面向对象来源于生
- 步骤,如图所示:1.添加异步任务业务类package top.ytheng.demo.task;import java.util.concu
- Spring @Cacheable指定失效时间新版本配置@Configuration@EnableCachingpublic class R
- java -jar设置添加启动参数方法java -jar 参数前后位置说明springboot项目启动的时候可以直接使用java -jar
- 本文实例讲述了winform绑定快捷键的方法。分享给大家供大家参考。具体分析如下:第一种:Alt + *(按钮快捷键)在大家给button、
- 问题,打一个页面cpu暴涨,打开一次就涨100%,一会系统就卡的不行了。排查方法,因为是线上的linux,没有用jvm监控工具rim链接上去
- 无论是我们在使用word还是记事本,系统都会为我们提供撤销的功能,这几乎是人人都会使用到的功能,而在我们实际开发中,会不会存在一个很复杂的对
- 数据的类型介绍类型的基本归类在写数据类型的介绍之前,我们首先来简单介绍下 release版本与debug版本之间的在内存上的区别:我们先将下
- 使用filter对request body参数进行校验@Slf4jpublic class ParameterCheckServletReq
- 我想大家对DateTime.ToString()方法的用法肯定已经非常熟悉了,但我想大家用过的大部分用法都是:DateTime.ToStri
- 目录类划分时关于内聚性的问题静态类的设计高内聚类的设计附:面向过程编程中模块的内聚性偶然内聚或巧合内聚(Coincidental)逻辑内聚(
- 在上个月的对C#开发微信门户及应用做了介绍,写过了几篇的随笔进行分享,由于时间关系,间隔了一段时间没有继续写这个系列的博客了,并不是对这个方
- 01.点明观点 C#中,非托管资源使用之后必须释放,而using()是使用非托管资源的最佳方式,可以确保资源在代码块结束之后被正确
- 这篇文章主要介绍了Java如何把数组转换为ArrayList,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需
- 目录1、对于A、B两种排队方式,说法正确的是2、Inter-process communication (IPC) is the trans
- ReentrantLock锁ReentrantLock是Java中常用的锁,属于乐观锁类型,多线程并 * 况下。能保证共享数据安全性,线程间有