许多服务器应用程序,如Web服务器,数据库服务器等都面临着处理众多客户端发起的请求,这些请求往往都是短暂的、大量的。如果采用为每个到达的请求创建一个新线程来响应客户端请求的话,至少存在以下两个缺点:
其一,为每个请求都创建一个线程的开销会非常大;其二,除了创建和销毁线程的开销外,大量活动线程还会消耗系统资源,导致系统内存不足或由于内存消耗过多而导致系统崩溃。
线程池为上述提及的线程生命周期开销问题及系统资源消耗问题提供了解决方案。
1.概念
在计算机程序设计中,线程池(thread pool)[1]是一种用来实现计算机程序并发性编程的软件设计模式。一个线程池维护着多个线程,等待监控程序分配并发执行的任务。通过维护线程池,该模型可以避免线程的频繁创建和销毁从而提高了程序的性能。
为线程执行调度任务的一种常见方法是使用同步队列,通常称之为任务队列(task queue)[1],线程池将任务队列中等待的任务移除并交给空闲的线程进行相应的处理。如图1-1所示:
图1-1
2.性能
线程池的大小是为执行任务而保留的线程数量,它通常是应用程序的可调参数,经过调整以优化程序性能。
相比于为每个任务创建一个新线程,使用线程池的开销要远远小于前者,这会为程序带来更好的性能和稳定性。因为,创建和销毁一个线程及其相关资源是一个代价高昂的过程。但是,线程池中的线程的数量也不是越多越好,线程过多的话会不仅会浪费内存,并且在线程之间的上下文切换也会导致性能的下降。
在应用程序的生命周期中,我们还可以根据任务队列中的等待任务的数量动态的调整线程池中线程的数量(如文章最后的示例所示)。例如,对一个web服务器而言,当有大量的web请求到来时,web服务器可以添加线程,而当这些请求逐渐减少时,web服务器也可以销毁部分线程。而至于添加或销毁多少线程,都与系统的性能息息相关,因此我们需要知道:
(1)创建太多线程不仅会浪费系统资源,还会浪费时间;
(2)同理,盲目销毁太多的线程也不可取;
(3)创建线程的速度过慢的话可能会导致客户端性能低下(等待时间长);
(4)销毁线程的速度过慢的话可能会导致系统资源的枯竭。
因此,线程的多少直接影响到程序的性能,一个线程池中究竟需要创建多少个线程取决于多个方面,如CPU、内存以及业务逻辑等。
3.工作队列
在查阅相关资料之后,本人还是没能找到线程池和工作队列两个术语之间区别的权威说明。其中,参考文献[3]中指出,“The work queue is a simple and elegant type of thread pool that creates requested number of threads at its creation and manages a queue of different work items that implement the specific tasks, where each work item in his turn gets a thread that works and processes it.”。
结合查阅的资料,我个人所倾向的理解是:线程池是一种软件设计模式,而工作队列则是这种模式的一个具体实现。
4.示例
当创建工作队列时,可以指定需要的最大并发级别。我们可以通过“工作量”的大小,适当的增加或减少线程。并且,如果一个线程在经过一定的时间后仍然没有获得“工作”,那么我们可以将其销毁以达到释放资源的目的。
(1)工作队列
work_queue.h及work_queue.c两个文件包含了工作队列的具体实现,并且我在代码中加了较为详细的注释,此处不再赘述。
work_queue.h源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
/* * filename: work_queue.h * author: leo */ #include <pthread.h> /* * 工作队列中工作元素的内部表示 */ typedef struct workq_ele_tag{ struct workq_ele_tag *next; //指向下一个元素 void *data; //工作元素的有效数据 }workq_ele_t; /* * 工作队列的外部表示 */ typedef struct workq_tag{ pthread_mutex_t mutex; //互斥量,用来序列化对工作队列的访问 pthread_cond_t cond; //条件变量,线程在上面等待队列中的工作 pthread_attr_t attr; //线程的属性变量 workq_ele_t *first,*last; //分别指向队列中的第一个和最后一个工作元素 int quit; //标志位,1表示销毁线程 int parallelism; //允许并发的最大线程数量 int counter; //记录创建的线程数 int idle; //记录空闲线程数 void (*router)(void *arg); //工作函数 }workq_t; /* * 工作队列接口 */ /*初始化工作队列。wq:工作队列,threads:最大并发线程数,router:工作函数*/ int workq_init(workq_t *wq, int threads, void (*router)(void *)); /*销毁工作队列*/ int workq_destroy(workq_t *wq); /*向工作队列中添加元素。wq:工作队列,data:传递给工作函数的参数*/ int workq_add(workq_t *wq, void *data); |
work_queue.c源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
/* * filename: work_queue.c * author: leo */ #include <pthread.h> #include <stdlib.h> #include <stdio.h> #include <time.h> #include <errno.h> #include <string.h> #include "work_queue.h" static void *workq_start(void *arg); /* * 初始化工作队列 */ int workq_init(workq_t *wq, int threads, void (*router)(void *arg)) { int ret; /*线程属性变量初始化*/ ret = pthread_attr_init(&wq->attr); if(ret != 0) return ret; /*线程属性设置为脱离的*/ ret = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED); if(ret != 0){ pthread_attr_destroy(&wq->attr); return ret; } /*互斥量初始化*/ ret = pthread_mutex_init(&wq->mutex, NULL); if(ret != 0){ pthread_attr_destroy(&wq->attr); return ret; } /*条件变量初始化*/ ret = pthread_cond_init(&wq->cond, NULL); if(ret != 0){ pthread_mutex_destroy(&wq->mutex); pthread_attr_destroy(&wq->attr); return ret; } wq->quit = 0; wq->first = wq->last = NULL; wq->parallelism = threads; wq->counter = 0; wq->idle = 0; wq->router = router; return 0; } /* * 销毁工作队列 */ int workq_destroy(workq_t *wq) { int ret, ret1, ret2; /*进入临界区,避免多个线程同时销毁工作队列*/ ret = pthread_mutex_lock(&wq->mutex); if(ret != 0) return ret; /*线程池中的工作线程的数量大于0*/ if(wq->counter > 0){ /*销毁工作队列标志位置为1*/ wq->quit = 1; /*唤醒所有空闲的线程,以便销毁它们*/ if(wq->idle > 0){ ret = pthread_cond_broadcast(&wq->cond); if(ret != 0){ pthread_mutex_unlock(&wq->mutex); return ret; } } /*在工作线程数量不为0之前,销毁工作队列的工作会阻塞于此, *直到最后一个工作线程即将销毁时,阻塞解除*/ while(wq->counter > 0){ ret = pthread_cond_wait(&wq->cond, &wq->mutex); if(ret != 0){ pthread_mutex_unlock(&wq->mutex); return ret; } } } /*退出临界区*/ ret = pthread_mutex_unlock(&wq->mutex); if(ret != 0){ return ret; } /*销毁互斥量*/ ret = pthread_mutex_destroy(&wq->mutex); /*销毁条件变量*/ ret1 = pthread_cond_destroy(&wq->cond); /*销毁线程属性变量*/ ret2 = pthread_attr_destroy(&wq->attr); return (ret ? ret : (ret1 ? ret1 : ret2)); } /* * 向工作队列中添加工作元素 */ int workq_add(workq_t *wq, void *element) { workq_ele_t *item; pthread_t id; int ret; /*初始化一个工作队列元素*/ item = (workq_ele_t *)malloc(sizeof(workq_ele_t)); if(item == NULL){ perror("malloc"); return -1; } item->data = element; item->next = NULL; /*进入临界区,向工作队列中添加元素*/ ret = pthread_mutex_lock(&wq->mutex); if(ret != 0){ free(item); return ret; } if(wq->first == NULL) wq->first = item; else wq->last->next = item; wq->last = item; /*开始分配任务*/ /*1.如果有空闲的线程,则唤醒一个空闲线程*/ if(wq->idle > 0){ ret = pthread_cond_signal(&wq->cond); if(ret != 0){ pthread_mutex_unlock(&wq->mutex); return ret; } } /*2.没有空闲线程的话,如果当前的线程数量没有达到最大的并发线程数,则创建一个新线程*/ else if(wq->counter < wq->parallelism){ printf("Creating a new worker\n"); /* * 新创建线程的入口函数为workq_start,该函数为工作队列的启动函数, * 它会根据当前工作队列的状况做出相应的处理,比如:队列为空?队列不为空? * 以及工作队列的销毁标志位quit是否被置为1?等等 */ ret = pthread_create(&id, &wq->attr, workq_start, (void*)wq); if(ret != 0){ pthread_mutex_unlock(&wq->mutex); return ret; } /*每创建一个线程,counter计数加1*/ wq->counter++; } /*退出临界区*/ pthread_mutex_unlock(&wq->mutex); return 0; } /* * 工作队列的启动函数 */ static void *workq_start(void *arg) { struct timespec timeout; workq_t *wq = (workq_t*)arg; workq_ele_t *we; int ret, timedout; //printf("A worker is starting\n"); /*进入临界区*/ ret = pthread_mutex_lock(&wq->mutex); if(ret != 0) return NULL; while(1){ /*线程超时标志*/ timedout = 0; printf("Worker[%lu] waiting for work\n", pthread_self()); /*获取当前时间*/ clock_gettime(CLOCK_REALTIME, &timeout); /*线程超时时间设置为2秒*/ timeout.tv_sec += 2; /*当队列为空(quit标志位未被置为1的情况下)*/ while(wq->first == NULL && !wq->quit){ /*队列为空时,工作线程等待2秒后若仍然没有被分配任务, *则超时标志置为1,该工作线程销毁*/ ret = pthread_cond_timedwait(&wq->cond, &wq->mutex, &timeout); if(ret == ETIMEDOUT){ printf("Worker[%lu] wait timed out\n", pthread_self()); timedout = 1; break; } else if(ret != 0){ printf("Worker wait failed, %d(%s)\n", ret, strerror(ret)); wq->counter--; pthread_mutex_unlock(&wq->mutex); return NULL; } } /*当队列不为空(quit标志位未被置为1的情况下),从队列头取出一个任务并处理*/ we = wq->first; /*调整队列结构*/ if(we != NULL){ /*设置新的队列头*/ wq->first = we->next; /*若取出的元素是最后一个元素,则设置队列尾元素为空*/ if(wq->last == we) wq->last = NULL; /*退出临界区*/ ret = pthread_mutex_unlock(&wq->mutex); if(ret != 0) return NULL; /*工作线程调用工作函数开始工作...*/ //printf("Worker working..."); wq->router(we->data); /*完成工作后,释放we结构*/ free(we); ret = pthread_mutex_lock(&wq->mutex); if(ret != 0) return NULL; } /*当工作队列为空,并且quit标志位被置为1*/ if(wq->first == NULL && wq->quit){ printf("Worker[%lu] shutting down\n", pthread_self()); wq->counter--; /*当线程池中的线程数量为0后发出广播,此时阻塞在调用workq_destroy *函数的主线程会解除阻塞,并完成工作队列最后的销毁工作*/ if(wq->counter == 0) pthread_cond_broadcast(&wq->cond); pthread_mutex_unlock(&wq->mutex); return NULL; } /*当工作队列为空,并且线程超时后,自动退出*/ if(wq->first == NULL && timedout){ printf("Worker[%lu] terminating due to timed out\n", pthread_self()); wq->counter--; break; } } /*退出临界区*/ pthread_mutex_unlock(&wq->mutex); printf("Worker exiting...\n"); return NULL; } |
(2)实例
main.c实例中,我们初始化一个工作队列,并向工作队列中添加10个任务,main.c源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
/* * filename: main.c * author: leo */ #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include "work_queue.h" void my_router(void *arg) { printf("Worker[%lu] working on task[%d]\n", pthread_self(), *(int*)arg); /*休眠0-3秒*/ sleep(rand()%4); } int main() { int ret; workq_t workq; /*初始化工作队列*/ ret = workq_init(&workq, 5, my_router); if(ret != 0){ fprintf(stderr, strerror(ret)); return -1; } /*投入10个任务*/ int *task = (int*)malloc(sizeof(int)*10); if(task == NULL){ perror("malloc"); return -1; } int i; for(i=0; i<10; ++i){ task[i] = i; /*每隔1秒投入一个任务*/ sleep(1); ret = workq_add(&workq, &task[i]); if(ret != 0){ fprintf(stderr, strerror(ret)); free(task); return -1; } } /*销毁工作队列*/ ret = workq_destroy(&workq); if(ret != 0){ fprintf(stderr, strerror(ret)); free(task); return -1; } free(task); return 0; } |
(3)编译、运行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
[root@localhost demo]# gcc -Wall -g main.c work_queue.h work_queue.c -o main -lpthread -lrt [root@localhost demo]# ./main Creating a new worker Worker[139668634634000] waiting for work Worker[139668634634000] working on task[0] Creating a new worker Worker[139668624144144] waiting for work Worker[139668624144144] working on task[1] Creating a new worker Worker[139668613654288] waiting for work Worker[139668613654288] working on task[2] Worker[139668634634000] waiting for work Worker[139668624144144] waiting for work Creating a new worker Worker[139668613654288] waiting for work Worker[139668613654288] working on task[3] Worker[139668603164432] waiting for work Creating a new worker Worker[139668592674576] waiting for work Worker[139668592674576] working on task[4] Worker[139668634634000] wait timed out Worker[139668634634000] terminating due to timed out Worker exiting... Worker[139668624144144] wait timed out Worker[139668624144144] terminating due to timed out Worker exiting... Worker[139668603164432] wait timed out Worker[139668603164432] terminating due to timed out Worker exiting... Creating a new worker Worker[139668592674576] waiting for work Worker[139668592674576] working on task[5] Worker[139668603164432] waiting for work Worker[139668613654288] waiting for work Creating a new worker Worker[139668624144144] waiting for work Worker[139668624144144] working on task[6] Worker[139668603164432] wait timed out Worker[139668603164432] terminating due to timed out Worker exiting... Creating a new worker Worker[139668603164432] waiting for work Worker[139668603164432] working on task[7] Worker[139668603164432] waiting for work Worker[139668613654288] wait timed out Worker[139668613654288] terminating due to timed out Worker exiting... Worker[139668592674576] waiting for work Creating a new worker Worker[139668624144144] waiting for work Worker[139668624144144] working on task[8] Worker[139668613654288] waiting for work Creating a new worker Worker[139668624144144] waiting for work Worker[139668624144144] working on task[9] Worker[139668603164432] wait timed out Worker[139668603164432] shutting down Worker[139668634634000] waiting for work Worker[139668634634000] shutting down Worker[139668592674576] wait timed out Worker[139668592674576] shutting down Worker[139668613654288] wait timed out Worker[139668613654288] shutting down Worker[139668624144144] shutting down |
参考:
[1] https://en.wikipedia.org/wiki/Thread_pool
[2] https://www.ibm.com/developerworks/java/library/j-jtp0730/index.html
[3] https://www.codeproject.com/Articles/3607/Work-Queue
[4] David R.Butenhof. POSIX多线程程序设计