软件编程
位置:首页>> 软件编程>> C语言>> C语言实现支持动态拓展和销毁的线程池

C语言实现支持动态拓展和销毁的线程池

作者:lijiao  发布时间:2023-05-11 10:47:42 

标签:C语言,线程池

本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下

实现功能

  • 1.初始化指定个数的线程

  • 2.使用链表来管理任务队列

  • 3.支持拓展动态线程

  • 4.如果闲置线程过多,动态销毁部分线程


#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <signal.h>

/*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/
typedef struct thread_worker_s{
 void *(*process)(void *arg); //处理函数
 void *arg;          //参数
 struct thread_worker_s *next;
}thread_worker_t;

#define bool int
#define true 1
#define false 0

/*线程池中各线程状态描述*/
#define THREAD_STATE_RUN        0
#define THREAD_STATE_TASK_WAITING   1
#define THREAD_STATE_TASK_PROCESSING  2
#define THREAD_STATE_TASK_FINISHED   3
#define THREAD_STATE_EXIT       4  

typedef struct thread_info_s{
 pthread_t id;
 int    state;
 struct thread_info_s *next;
}thread_info_t;

static char* thread_state_map[] ={"创建","等待任务","处理中","处理完成","已退出"};
/*线程压缩的时候只有 0,1,2,4 状态的线程可以销毁*/

/*线程池管理器*/
#define THREAD_BUSY_PERCENT 0.5  /*线程:任务 = 1:2 值越小,说明任务多,增加线程*/
#define THREAD_IDLE_PERCENT 2   /*线程:任务 = 2:1 值大于1,线程多于任务,销毁部分线程*/

typedef struct thread_pool_s{
 pthread_mutex_t queue_lock ; //队列互斥锁,即涉及到队列修改时需要加锁
 pthread_cond_t queue_ready; //队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了

thread_worker_t *head   ;    //任务队列头指针
 bool    is_destroy   ;    //线程池是否已经销毁
 int num;              //线程的个数
 int rnum;         ;    //正在跑的线程
 int knum;         ;    //已杀死的线程
 int queue_size       ;    //工作队列的大小
 thread_info_t *threads   ;    //线程组id,通过pthread_join(thread_ids[0],NULL) 来执行线程
 pthread_t   display   ;    //打印线程
 pthread_t   destroy   ;    //定期销毁线程的线程id
 pthread_t   extend    ;
 float percent       ;    //线程个数于任务的比例 rnum/queue_size
 int  init_num      ;
 pthread_cond_t  extend_ready     ;    //如果要增加线程
}thread_pool_t;

/*-------------------------函数声明----------------------*/
/**
* 1.初始化互斥变量
* 2.初始化等待变量
* 3.创建指定个数的线程线程
*/
thread_pool_t* thread_pool_create(int num);
void *thread_excute_route(void *arg);

/*调试函数*/
void debug(char *message,int flag){
 if(flag)
   printf("%s\n",message);
}

void *display_thread(void *arg);
/**
* 添加任务包括以下几个操作
* 1.将任务添加到队列末尾
* 2.通知等待进程来处理这个任务 pthread_cond_singal();
*/
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //网线程池的队列中增加一个需要执行的函数,也就是任务

/**
* 销毁线程池,包括以下几个部分
* 1.通知所有等待的进程 pthread_cond_broadcase
* 2.等待所有的线程执行完
* 3.销毁任务列表
* 4.释放锁,释放条件
* 4.销毁线程池对象
*/

void *thread_pool_is_need_recovery(void *arg);
void *thread_pool_is_need_extend(void *arg);
void thread_pool_destory(thread_pool_t *pool);

thread_pool_t *thread_pool_create(int num){
 if(num<1){
   return NULL;
 }
 thread_pool_t *p;
 p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));
 if(p==NULL)
   return NULL;
 p->init_num = num;
 /*初始化互斥变量与条件变量*/
 pthread_mutex_init(&(p->queue_lock),NULL);
 pthread_cond_init(&(p->queue_ready),NULL);

/*设置线程个数*/
 p->num  = num;
 p->rnum = num;
 p->knum = 0;

p->head = NULL;
 p->queue_size =0;
 p->is_destroy = false;

int i=0;
 thread_info_t *tmp=NULL;
 for(i=0;i<num;i++){
   /*创建线程*/
   tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
   if(tmp==NULL){
     free(p);
     return NULL;
   }else{
     tmp->next = p->threads;
     p->threads = tmp;
   }
   pthread_create(&(tmp->id),NULL,thread_excute_route,p);
   tmp->state = THREAD_STATE_RUN;
 }

/*显示*/
 pthread_create(&(p->display),NULL,display_thread,p);
 /*检测是否需要动态线程*/
 //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
 /*动态销毁*/
 pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
 return p;
}

int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){
 thread_pool_t *p= pool;
 thread_worker_t *worker=NULL,*member=NULL;
 worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));
 int incr=0;
 if(worker==NULL){
   return -1;
 }
 worker->process = process;
 worker->arg   = arg;
 worker->next  = NULL;
 thread_pool_is_need_extend(pool);
 pthread_mutex_lock(&(p->queue_lock));
 member = p->head;
 if(member!=NULL){
   while(member->next!=NULL)
     member = member->next;
   member->next = worker;
 }else{
   p->head = worker;
 }
 p->queue_size ++;
 pthread_mutex_unlock(&(p->queue_lock));
 pthread_cond_signal(&(p->queue_ready));
 return 1;
}

void thread_pool_wait(thread_pool_t *pool){
 thread_info_t *thread;
 int i=0;
 for(i=0;i<pool->num;i++){
   thread = (thread_info_t*)(pool->threads+i);
   thread->state = THREAD_STATE_EXIT;
   pthread_join(thread->id,NULL);
 }
}
void thread_pool_destory(thread_pool_t *pool){
 thread_pool_t  *p   = pool;
 thread_worker_t *member = NULL;

if(p->is_destroy)
   return ;
 p->is_destroy = true;
 pthread_cond_broadcast(&(p->queue_ready));
 thread_pool_wait(pool);
 free(p->threads);
 p->threads = NULL;
 /*销毁任务列表*/
 while(p->head){
   member = p->head;
   p->head = member->next;
   free(member);
 }
 /*销毁线程列表*/
 thread_info_t *tmp=NULL;
 while(p->threads){
   tmp = p->threads;
   p->threads = tmp->next;
   free(tmp);
 }

pthread_mutex_destroy(&(p->queue_lock));
 pthread_cond_destroy(&(p->queue_ready));
 return ;
}
/*通过线程id,找到对应的线程*/
thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){
 thread_info_t *thread=NULL;
 thread_info_t *p=pool->threads;
 while(p!=NULL){
   if(p->id==id)
     return p;
   p = p->next;
 }
 return NULL;
}

/*每个线程入口函数*/
void *thread_excute_route(void *arg){
 thread_worker_t *worker = NULL;
 thread_info_t  *thread = NULL;
 thread_pool_t*  p = (thread_pool_t*)arg;
 //printf("thread %lld create success\n",pthread_self());
 while(1){
   pthread_mutex_lock(&(p->queue_lock));

/*获取当前线程的id*/
   pthread_t pthread_id = pthread_self();
   /*设置当前状态*/
   thread = get_thread_by_id(p,pthread_id);

/*线程池被销毁,并且没有任务了*/
   if(p->is_destroy==true && p->queue_size ==0){
     pthread_mutex_unlock(&(p->queue_lock));
     thread->state = THREAD_STATE_EXIT;
     p->knum ++;
     p->rnum --;
     pthread_exit(NULL);
   }
   if(thread){
     thread->state = THREAD_STATE_TASK_WAITING; /*线程正在等待任务*/
   }
   /*线程池没有被销毁,没有任务到来就一直等待*/
   while(p->queue_size==0 && !p->is_destroy){
     pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
   }
   p->queue_size--;
   worker = p->head;
   p->head = worker->next;
   pthread_mutex_unlock(&(p->queue_lock));

if(thread)
     thread->state = THREAD_STATE_TASK_PROCESSING; /*线程正在执行任务*/
   (*(worker->process))(worker->arg);
   if(thread)
     thread->state = THREAD_STATE_TASK_FINISHED;  /*任务执行完成*/
   free(worker);
   worker = NULL;
 }
}

/*拓展线程*/
void *thread_pool_is_need_extend(void *arg){
 thread_pool_t *p = (thread_pool_t *)arg;
 thread_pool_t *pool = p;
 /*判断是否需要增加线程,最终目的 线程:任务=1:2*/
 if(p->queue_size>100){
   int incr =0;
   if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){
     incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /*计算需要增加线程个数*/
     int i=0;
     thread_info_t *tmp=NULL;
     thread_pool_t *p = pool;
     pthread_mutex_lock(&pool->queue_lock);
     if(p->queue_size<100){
       pthread_mutex_unlock(&pool->queue_lock);
       return ;
     }
     for(i=0;i<incr;i++){
       /*创建线程*/
       tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
       if(tmp==NULL){
         continue;
       }else{
         tmp->next = p->threads;
         p->threads = tmp;
       }
       p->num ++;
       p->rnum ++;
       pthread_create(&(tmp->id),NULL,thread_excute_route,p);
       tmp->state = THREAD_STATE_RUN;
     }
     pthread_mutex_unlock(&pool->queue_lock);
   }
 }
 //pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_t sum_ready;
/*恢复初始线程个数*/
void *thread_pool_is_need_recovery(void *arg){
 thread_pool_t *pool = (thread_pool_t *)arg;
 int i=0;
 thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;
 /*如果没有任务了,当前线程大于初始化的线程个数*/
 while(1){
   i=0;
   if(pool->queue_size==0 && pool->rnum > pool->init_num ){
     sleep(5);
     /*5s秒内还是这个状态的话就,销毁部分线程*/
     if(pool->queue_size==0 && pool->rnum > pool->init_num ){
       pthread_mutex_lock(&pool->queue_lock);
       tmp = pool->threads;
       while((pool->rnum != pool->init_num) && tmp){
         /*找到空闲的线程*/
         if(tmp->state != THREAD_STATE_TASK_PROCESSING){
           i++;
           if(prev)
             prev->next  = tmp->next;
           else
             pool->threads = tmp->next;
           pool->rnum --; /*正在运行的线程减一*/
           pool->knum ++; /*销毁的线程加一*/
           kill(tmp->id,SIGKILL); /*销毁线程*/
           p1 = tmp;
           tmp = tmp->next;
           free(p1);
           continue;
         }
         prev = tmp;
         tmp = tmp->next;
       }
       pthread_mutex_unlock(&pool->queue_lock);
       printf("5s内没有新任务销毁部分线程,销毁了 %d 个线程\n",i);
     }
   }
   sleep(5);
 }
}

/*打印一些信息的*/
void *display_thread(void *arg){
 thread_pool_t *p =(thread_pool_t *)arg;
 thread_info_t *thread = NULL;
 int i=0;
 while(1){
   printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum);  /*线程总数,正在跑的,已销毁的*/
   thread = p->threads;
   while(thread){
     printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]);
     thread = thread->next;
   }
   sleep(5);
 }
}

希望本文所述对大家学习C语言程序设计有所帮助。

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com