首页 > 技术文章 > 一个简单的线程池实现

kamicoder 2017-02-11 21:41 原文

前段时间学习了线程方面的知识,看了关于线程池的教程,自己也试着实现一个。跟大家分享,同时也整理整理思路。

 

对线程池的要求:

1.用于处理大量短暂的任务。

2.动态增加线程,直到达到最大允许的线程数量。

3.动态销毁线程。

 

线程池的实现类似于”消费者--生产者”模型:

用一个队列存放任务(仓库,缓存)

主线程添加任务(生产者生产任务)

新建线程函数执行任务(消费者执行任务)

 

由于任务队列是全部线程共享的,就涉及到同步问题。这里采用条件变量和互斥锁来实现。

 

--------------------------------------------------------condition.h----------------------------------------------------------

 

/*

在此线程池中互斥锁和条件变量都是配套使用,编写此头文件使用比较方便

在此线程池中保护共享数据的都是用这个头文件中的函数

*/

 

#ifndef _CONDITION_H_

#define _CONDITION_H_

 

#include <pthread.h>

 

//有互斥锁和条件变量的结构体condition_t

typedef struct condition

{

         pthread_cond_t pcond;

         pthread_mutex_t pmutex;

} condition_t;

 

int condition_init(condition_t *cond)

{

         if(pthread_cond_init(&cond->pcond, NULL))

                   return 1;

         if(pthread_mutex_init(&cond->pmutex, NULL))

                   return 1;

         return 0; 

};

 

int condition_lock(condition_t *cond)

{

         return pthread_mutex_lock(&cond->pmutex);

}

 

int condition_unlock(condition_t *cond)

{

         return pthread_mutex_unlock(&cond->pmutex);

}

 

int condition_wait(condition_t *cond)

{

         return pthread_cond_wait(&cond->pcond, &cond->pmutex);

}

 

int condition_timedwait(condition_t *cond, const struct timespec *abstime)

{

        return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);

}

 

int condition_signal(condition_t *cond)

{

         return pthread_cond_signal(&cond->pcond);

}

 

int condition_broadcast(condition_t *cond)

{

         return pthread_cond_broadcast(&cond->pcond);

}

 

int condition_destroy(condition_t *cond)

{

         if(pthread_cond_destroy(&cond->pcond))

                   return 1;

         if(pthread_mutex_destroy(&cond->pmutex))

                   return 1;

         return 0;

}

 

#endif

 

 

 

-------------------------------------------------------threadpool.h--------------------------------------------------------

 

//线程池头文件

#ifndef _THREADPOOL_H_

#define _THREADPOOL_H_

 

#include "condition.h"

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <sys/time.h>

#include <time.h>

 

//线程任务结构体(队列形式存储)

typedef struct Task

{

         void* (*run) (void*arg);

         void *arg;

         struct Task *next;

} task_t;

 

//线程池结构体

typedef struct threadpool

{

         condition_t ready;                    //互斥锁与条件变量

         task_t *first;                      //任务队列头

         task_t *last;                                    //任务队列尾

         int counter;                                     //任务总数

         int maxthread;                      //最大线程数

         int idle;                         //正在等待的线程数量

         int quit;                         //销毁线程池标志

} threadpool_t;

 

//初始化线程池

void threadpool_init(threadpool_t *pool, int max_num)

{

         condition_init(&pool->ready);

         pool->first = NULL;

         pool->last = NULL;

         pool->counter = 0;

         pool->maxthread = max_num;

         pool->idle = 0;

         pool->quit = 0;

}

 

 

//新建线程函数

void *thread_routine(void *arg)

{

         struct timespec abstime;

         int timeout;                                  //是否超过等待任务时间

         threadpool_t *pool = (threadpool_t*)arg;

 

         printf("0x%0x thread is starting\n", (int)pthread_self());

 
     //让线程不会因为结束任务立刻销毁
         for(;;)                                            

         {

                   timeout = 0;

                   condition_lock(&pool->ready);                               //对全局变量任务队列进行操作,要加锁

                   pool->idle ++;

           //等待任务队列中有任务

                   while(pool->first == NULL && pool->quit == 0)

                   {

                            printf("0x%0x thread is waiting\n", (int)pthread_self());

                           

                            clock_gettime(CLOCK_REALTIME, &abstime);

                            abstime.tv_sec += 2;                                   //设置等待超时时间

                            int status = condition_timedwait(&pool->ready, &abstime);

                            if(status == 110)                                      //timewait函数超时返回110(TIMEDOUT)

                            {

                                     printf("0x%0x thread is timeout\n", (int)pthread_self());

                                     timeout = 1;

                                     break;

                            }

                           

                   }

        

                   //正在等待的线程数量减一

                   pool->idle --;    

           //若任务队列中有任务

                   if(pool->first != NULL)

                   {

                            task_t *t = pool->first;

                            pool->first = pool->first->next;

                //先解锁再执行任务函数,让其它线程可以对任务队列进行操作,提高效率

                            condition_unlock(&pool->ready);            

                            t->run(t->arg);

                            free(t);

                //任务执行完毕,重新上锁

                            condition_lock(&pool->ready);

                   }

                  

                   //有销毁线程池命令 并且 线程队列为空

                   if(pool->first == NULL && pool->quit == 1)

                   {

                            pool->counter--;                                            //减掉一个任务

                            if(pool->counter == 0)                      //任务全部完成,向销毁函数发起通知

                                      condition_signal(&pool->ready);

                            condition_unlock(&pool->ready);                 //不要忘记解锁

                            break;

                   }

 

                   if(pool->first == NULL && timeout == 1)                  //等待任务超时

                   {

                            pool->counter--;                          //减掉一个任务

                            condition_unlock(&pool->ready);                 //同样不要忘记解锁

                            break;

                   }

                   condition_unlock(&pool->ready);

         }

         printf("0x%0x threadpool is exiting\n", (int)pthread_self());

         return NULL;

}

 

//向任务队列中添加任务

void threadpool_add_task(threadpool_t *pool, void* (*run) (void*arg), void*arg)

{

         //动态分配空间给新任务

         task_t *newtask = (task_t*)malloc(sizeof(task_t));

         newtask->run = run;

         newtask->arg = arg;

         newtask->next = NULL;

        

     //要对任务队列进行操作,上锁

         condition_lock(&pool->ready);

 

         //添加新任务到任务队列

         if(pool->first == NULL)

                   pool->first = newtask;

         else

                   pool->last->next = newtask;

         pool->last = newtask;

 

         //如果有线程在等待,则不用创建新线程,直接发起通知处理任务

         if(0 < pool->idle)

                   condition_signal(&pool->ready);

 

     //当前线程数不能超过线程数,用<不用<=,因为counter初始为0

         else if(pool->counter < pool->maxthread)                

         {

                   pthread_t thread;

                   pthread_create(&thread, NULL, thread_routine, (void*)pool);

                   pool->counter++;

         }

 

         condition_unlock(&pool->ready);

}

 

//销毁线程池

void threadpool_destroy(threadpool_t *pool)

{

         if(pool->quit)                                                      //若已经销毁,直接返回,避免销毁两次

                   return;

         condition_lock(&pool->ready);

         pool->quit = 1;

         if(pool->counter > 0)

         {

                   condition_broadcast(&pool->ready);                //广播关闭正在等待的线程

                   if(pool->counter > 0)                                    //若有正在进行的线程等待线程结束的通知

                            condition_wait(&pool->ready);

         }

         condition_unlock(&pool->ready);

         condition_destroy(&pool->ready);                            //销毁互斥锁和条件变量

}

 

#endif

 

 

 

 

 

 

 

 

//测试代码(main函数):

#include "threadpool.h"

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <errno.h>

 

void* mytask(void *arg)

{

         printf("0x%0x thread run the task %d\n", (int)pthread_self(), *(int*)arg);

         free(arg);

         sleep(1);

         return NULL;

}

 

int main(int argc, char **argv)

{

         threadpool_t pool;

         int i;

         threadpool_init(&pool, 4);

         for (i = 0; i < 10; ++i)

         {

                   int *arg = (int *)malloc(sizeof(int));

                   *arg = i;

                   threadpool_add_task(&pool, mytask, (void *)arg);

         }

//      sleep(15);

         threadpool_destroy(&pool);

         return 0;

}

 

 

代码执行结果如下:

 

[kami@localhost 线程池]$ ./main

0x335fe700 thread is starting

0x31dfb700 thread is starting

0x31dfb700 thread run the task 1

0x335fe700 thread run the task 0

0x32dfd700 thread is starting

0x32dfd700 thread run the task 2

0x325fc700 thread is starting

0x325fc700 thread run the task 3

0x31dfb700 thread run the task 4

0x335fe700 thread run the task 5

0x32dfd700 thread run the task 6

0x325fc700 thread run the task 7

0x335fe700 thread run the task 9

0x325fc700 threadpool is exiting

0x31dfb700 thread run the task 8

0x32dfd700 threadpool is exiting

0x335fe700 threadpool is exiting

0x31dfb700 threadpool is exiting

 

可以看到一共处理了0~9十个任务;

最大线程数量为4(0x335fe700,0x31dfb700, 0x32dfd700, 0x325fc700 );

等待每个线程都退出了才结束进程;

 

第一次写博客,希望能够在这里学到更多,也分享一些自己学习的心得体会,共勉互励。

 

推荐阅读