C语言实现支持动态拓展和销毁的线程池
作者:lijiao 发布时间:2023-05-11 10:47:42
标签:C语言,线程池
本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下
实现功能
1.初始化指定个数的线程
2.使用链表来管理任务队列
3.支持拓展动态线程
4.如果闲置线程过多,动态销毁部分线程
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <signal.h>
/*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/
typedef struct thread_worker_s{
void *(*process)(void *arg); //处理函数
void *arg; //参数
struct thread_worker_s *next;
}thread_worker_t;
#define bool int
#define true 1
#define false 0
/*线程池中各线程状态描述*/
#define THREAD_STATE_RUN 0
#define THREAD_STATE_TASK_WAITING 1
#define THREAD_STATE_TASK_PROCESSING 2
#define THREAD_STATE_TASK_FINISHED 3
#define THREAD_STATE_EXIT 4
typedef struct thread_info_s{
pthread_t id;
int state;
struct thread_info_s *next;
}thread_info_t;
static char* thread_state_map[] ={"创建","等待任务","处理中","处理完成","已退出"};
/*线程压缩的时候只有 0,1,2,4 状态的线程可以销毁*/
/*线程池管理器*/
#define THREAD_BUSY_PERCENT 0.5 /*线程:任务 = 1:2 值越小,说明任务多,增加线程*/
#define THREAD_IDLE_PERCENT 2 /*线程:任务 = 2:1 值大于1,线程多于任务,销毁部分线程*/
typedef struct thread_pool_s{
pthread_mutex_t queue_lock ; //队列互斥锁,即涉及到队列修改时需要加锁
pthread_cond_t queue_ready; //队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了
thread_worker_t *head ; //任务队列头指针
bool is_destroy ; //线程池是否已经销毁
int num; //线程的个数
int rnum; ; //正在跑的线程
int knum; ; //已杀死的线程
int queue_size ; //工作队列的大小
thread_info_t *threads ; //线程组id,通过pthread_join(thread_ids[0],NULL) 来执行线程
pthread_t display ; //打印线程
pthread_t destroy ; //定期销毁线程的线程id
pthread_t extend ;
float percent ; //线程个数于任务的比例 rnum/queue_size
int init_num ;
pthread_cond_t extend_ready ; //如果要增加线程
}thread_pool_t;
/*-------------------------函数声明----------------------*/
/**
* 1.初始化互斥变量
* 2.初始化等待变量
* 3.创建指定个数的线程线程
*/
thread_pool_t* thread_pool_create(int num);
void *thread_excute_route(void *arg);
/*调试函数*/
void debug(char *message,int flag){
if(flag)
printf("%s\n",message);
}
void *display_thread(void *arg);
/**
* 添加任务包括以下几个操作
* 1.将任务添加到队列末尾
* 2.通知等待进程来处理这个任务 pthread_cond_singal();
*/
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //网线程池的队列中增加一个需要执行的函数,也就是任务
/**
* 销毁线程池,包括以下几个部分
* 1.通知所有等待的进程 pthread_cond_broadcase
* 2.等待所有的线程执行完
* 3.销毁任务列表
* 4.释放锁,释放条件
* 4.销毁线程池对象
*/
void *thread_pool_is_need_recovery(void *arg);
void *thread_pool_is_need_extend(void *arg);
void thread_pool_destory(thread_pool_t *pool);
thread_pool_t *thread_pool_create(int num){
if(num<1){
return NULL;
}
thread_pool_t *p;
p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));
if(p==NULL)
return NULL;
p->init_num = num;
/*初始化互斥变量与条件变量*/
pthread_mutex_init(&(p->queue_lock),NULL);
pthread_cond_init(&(p->queue_ready),NULL);
/*设置线程个数*/
p->num = num;
p->rnum = num;
p->knum = 0;
p->head = NULL;
p->queue_size =0;
p->is_destroy = false;
int i=0;
thread_info_t *tmp=NULL;
for(i=0;i<num;i++){
/*创建线程*/
tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
if(tmp==NULL){
free(p);
return NULL;
}else{
tmp->next = p->threads;
p->threads = tmp;
}
pthread_create(&(tmp->id),NULL,thread_excute_route,p);
tmp->state = THREAD_STATE_RUN;
}
/*显示*/
pthread_create(&(p->display),NULL,display_thread,p);
/*检测是否需要动态线程*/
//pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
/*动态销毁*/
pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
return p;
}
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){
thread_pool_t *p= pool;
thread_worker_t *worker=NULL,*member=NULL;
worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));
int incr=0;
if(worker==NULL){
return -1;
}
worker->process = process;
worker->arg = arg;
worker->next = NULL;
thread_pool_is_need_extend(pool);
pthread_mutex_lock(&(p->queue_lock));
member = p->head;
if(member!=NULL){
while(member->next!=NULL)
member = member->next;
member->next = worker;
}else{
p->head = worker;
}
p->queue_size ++;
pthread_mutex_unlock(&(p->queue_lock));
pthread_cond_signal(&(p->queue_ready));
return 1;
}
void thread_pool_wait(thread_pool_t *pool){
thread_info_t *thread;
int i=0;
for(i=0;i<pool->num;i++){
thread = (thread_info_t*)(pool->threads+i);
thread->state = THREAD_STATE_EXIT;
pthread_join(thread->id,NULL);
}
}
void thread_pool_destory(thread_pool_t *pool){
thread_pool_t *p = pool;
thread_worker_t *member = NULL;
if(p->is_destroy)
return ;
p->is_destroy = true;
pthread_cond_broadcast(&(p->queue_ready));
thread_pool_wait(pool);
free(p->threads);
p->threads = NULL;
/*销毁任务列表*/
while(p->head){
member = p->head;
p->head = member->next;
free(member);
}
/*销毁线程列表*/
thread_info_t *tmp=NULL;
while(p->threads){
tmp = p->threads;
p->threads = tmp->next;
free(tmp);
}
pthread_mutex_destroy(&(p->queue_lock));
pthread_cond_destroy(&(p->queue_ready));
return ;
}
/*通过线程id,找到对应的线程*/
thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){
thread_info_t *thread=NULL;
thread_info_t *p=pool->threads;
while(p!=NULL){
if(p->id==id)
return p;
p = p->next;
}
return NULL;
}
/*每个线程入口函数*/
void *thread_excute_route(void *arg){
thread_worker_t *worker = NULL;
thread_info_t *thread = NULL;
thread_pool_t* p = (thread_pool_t*)arg;
//printf("thread %lld create success\n",pthread_self());
while(1){
pthread_mutex_lock(&(p->queue_lock));
/*获取当前线程的id*/
pthread_t pthread_id = pthread_self();
/*设置当前状态*/
thread = get_thread_by_id(p,pthread_id);
/*线程池被销毁,并且没有任务了*/
if(p->is_destroy==true && p->queue_size ==0){
pthread_mutex_unlock(&(p->queue_lock));
thread->state = THREAD_STATE_EXIT;
p->knum ++;
p->rnum --;
pthread_exit(NULL);
}
if(thread){
thread->state = THREAD_STATE_TASK_WAITING; /*线程正在等待任务*/
}
/*线程池没有被销毁,没有任务到来就一直等待*/
while(p->queue_size==0 && !p->is_destroy){
pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
}
p->queue_size--;
worker = p->head;
p->head = worker->next;
pthread_mutex_unlock(&(p->queue_lock));
if(thread)
thread->state = THREAD_STATE_TASK_PROCESSING; /*线程正在执行任务*/
(*(worker->process))(worker->arg);
if(thread)
thread->state = THREAD_STATE_TASK_FINISHED; /*任务执行完成*/
free(worker);
worker = NULL;
}
}
/*拓展线程*/
void *thread_pool_is_need_extend(void *arg){
thread_pool_t *p = (thread_pool_t *)arg;
thread_pool_t *pool = p;
/*判断是否需要增加线程,最终目的 线程:任务=1:2*/
if(p->queue_size>100){
int incr =0;
if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){
incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /*计算需要增加线程个数*/
int i=0;
thread_info_t *tmp=NULL;
thread_pool_t *p = pool;
pthread_mutex_lock(&pool->queue_lock);
if(p->queue_size<100){
pthread_mutex_unlock(&pool->queue_lock);
return ;
}
for(i=0;i<incr;i++){
/*创建线程*/
tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
if(tmp==NULL){
continue;
}else{
tmp->next = p->threads;
p->threads = tmp;
}
p->num ++;
p->rnum ++;
pthread_create(&(tmp->id),NULL,thread_excute_route,p);
tmp->state = THREAD_STATE_RUN;
}
pthread_mutex_unlock(&pool->queue_lock);
}
}
//pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_t sum_ready;
/*恢复初始线程个数*/
void *thread_pool_is_need_recovery(void *arg){
thread_pool_t *pool = (thread_pool_t *)arg;
int i=0;
thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;
/*如果没有任务了,当前线程大于初始化的线程个数*/
while(1){
i=0;
if(pool->queue_size==0 && pool->rnum > pool->init_num ){
sleep(5);
/*5s秒内还是这个状态的话就,销毁部分线程*/
if(pool->queue_size==0 && pool->rnum > pool->init_num ){
pthread_mutex_lock(&pool->queue_lock);
tmp = pool->threads;
while((pool->rnum != pool->init_num) && tmp){
/*找到空闲的线程*/
if(tmp->state != THREAD_STATE_TASK_PROCESSING){
i++;
if(prev)
prev->next = tmp->next;
else
pool->threads = tmp->next;
pool->rnum --; /*正在运行的线程减一*/
pool->knum ++; /*销毁的线程加一*/
kill(tmp->id,SIGKILL); /*销毁线程*/
p1 = tmp;
tmp = tmp->next;
free(p1);
continue;
}
prev = tmp;
tmp = tmp->next;
}
pthread_mutex_unlock(&pool->queue_lock);
printf("5s内没有新任务销毁部分线程,销毁了 %d 个线程\n",i);
}
}
sleep(5);
}
}
/*打印一些信息的*/
void *display_thread(void *arg){
thread_pool_t *p =(thread_pool_t *)arg;
thread_info_t *thread = NULL;
int i=0;
while(1){
printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum); /*线程总数,正在跑的,已销毁的*/
thread = p->threads;
while(thread){
printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]);
thread = thread->next;
}
sleep(5);
}
}
希望本文所述对大家学习C语言程序设计有所帮助。
0
投稿
猜你喜欢
- 本文为大家分享Android自定义Spinner适配器的相关知识点,供大家参考,具体内容如下一、大致效果二.关键代码在注释中讲重点吧。 (1
- 实际开发中订单往往都包含着订单状态,用户每进行一次操作都要切换对应的状态,而每次切换判断当前的状态是必须的,就不可避免的引入一系列判断语句,
- 应用开发的时候,有时我们需要将一些图片进行预览,例如:相片管理的应用。这个时候用ListView的话就显得不是太合适了,因为Li
- 准备过程:在电脑桌面 右键点击 “此电脑”的“属性”选项选择“高级系统设置”选项点击下面的“环境变量”选项配置过程:点击“系统变量”下面的”
- 不好意思哦,上一篇Android自学开发第六篇代码控制界面挖了个坑,如果运行不起来的同学,请注意查看本篇文章。Android Project
- 前言我个人觉得,中间件的部署与使用是非常难记忆的;也就是说,如果两次使用中间件的时间间隔比较长,那基本上等于要重新学习使用。所以,我觉得学习
- 在拼接绝对路径的网址时,经常需要从Request.Url中获取根网址(比如https://git.oschina.net),然后与相对路径一
- 前言smart-doc 是一款同时支持 java restful api 和 Apache Dubbo rpc 接口文档生成的工具,smar
- ArrayList集合在查询元素时速度很快,但在增删元素时效率较低,为了克服这种局限性,可以使用List接口的另一个实现类LinkedLis
- 本文实例讲述了java采用中文方式显示时间的方法。分享给大家供大家参考。具体如下:其中t为秒,比如有时候需要计算两个任务相差多久,或者该任务
- 不适用click而用touch自定义监听:class myOnGestureListener extends GestureDetector
- 1.企业实际项目中Git的使用在实际的企业项目开发中,我们一般Java的项目在公司都有自己的局域网代码仓库,仓库上存放着很多的项目。以我工作
- 我就废话不多说了,大家还是直接看代码吧~ public Sprite LoadSourceSprite(string relat
- 最近项目中有截屏的需求,普通的view截屏方法网上一搜一大把,但是SurfaceView截屏黑屏问题很多文章说的并不清楚,自己参考了一些别的
- 本文实例为大家分享了java实现简单年龄计算器的具体代码,供大家参考,具体内容如下制作一个如下图年龄计算器根据题目,我做了一个由Calend
- 本文源码:GitHub·点这里 || GitEE·点这里一、Ehcache缓存简介1、基础简介EhCache是一个纯Java的进程内缓存框架
- 因为某个项目需要,为团队其他兄弟姐妹开发了一个 XML 分析处理器,并将其设计为一个类库,提供相应的 API 接口。为了方便大家的使用,需要
- 一、背景项目中要解析xml,由于Dom4j的诸多优点,我就用Dom4j解析xml,代码如下:public void readXML() {
- C#程序自删除核心实现方法就是调用 cmd 传入命令行,等待几秒之后删除文件;应用程序在运行时,是不能将 exe 文件进行删除的。但是可以将
- 1.新建springBoot项目在前面有两种方式2.加入thymeleaf模板引擎SpringBoot推荐使用thymeleaf模板引擎语法