nginx源码分析线程池详解
一、前言
nginx是采用多进程模型,master和worker之间主要通过pipe管道的方式进行通信,多进程的优势就在于各个进程互不影响。但是经常会有人问道,nginx为什么不采用多线程模型(这个除了之前一篇文章讲到的情况,别的只有去问作者了,HAHA)。其实,nginx代码中提供了一个thread_pool(线程池)的核心模块来处理多任务的。下面就本人对该thread_pool这个模块的理解来跟大家做些分享(文中错误、不足还请大家指出,谢谢)
二、thread_pool线程池模块介绍
nginx的主要功能都是由一个个模块构成的,thread_pool也不例外。线程池主要用于读取、发送文件等IO操作,避免慢速IO影响worker的正常运行。先引用一段官方的配置示例
1
2
3
|
Syntax: thread_pool name threads=number [max_queue=number]; Default: thread_pool default threads=32 max_queue=65536; Context: main |
根据上述的配置说明,thread_pool是有名字的,上面的线程数目以及队列大小都是指每个worker进程中的线程,而不是所有worker中线程的总数。一个线程池中所有的线程共享一个队列,队列中的最大人数数量为上面定义的max_queue,如果队列满了的话,再往队列中添加任务就会报错。
根据之前讲到过的模块初始化流程(在master启动worker之前) create_conf--> command_set函数-->init_conf,下面就按照这个流程看看thread_pool模块的初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
/******************* nginx /src/core/ngx_thread_pool .c ************************/ // 创建线程池所需的基础结构 static void * ngx_thread_pool_create_conf(ngx_cycle_t *cycle) { ngx_thread_pool_conf_t *tcf; // 从cycle->pool指向的内存池中申请一块内存 tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t)); if (tcf == NULL) { return NULL; } // 先申请包含4个ngx_thread_pool_t指针类型元素的数组 //ngx_thread_pool_t 结构体中保存了一个线程池相关的信息 if (ngx_array_init(&tcf->pools, cycle->pool, 4, sizeof(ngx_thread_pool_t *)) != NGX_OK) { return NULL; } return tcf; } // 解析处理配置文件中thread_pool的配置,并将相关信息保存的ngx_thread_pool_t中 static char * ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_str_t *value; ngx_uint_t i; ngx_thread_pool_t *tp; value = cf->args->elts; // 根据thread_pool配置中的name作为线程池的唯一标识(如果重名,只有第一个有效) // 申请ngx_thread_pool_t结构保存线程池的相关信息 // 由此可见,nginx支持配置多个name不同的线程池 tp = ngx_thread_pool_add(cf, &value[1]); ....... // 处理thread_pool配置行的所有元素 for (i = 2; i < cf->args->nelts; i++) { // 检查配置的线程数 if (ngx_strncmp(value[i].data, "threads=" , 8) == 0) { ....... } // 检查配置的最大队列长度 if (ngx_strncmp(value[i].data, "max_queue=" , 10) == 0) { ....... } } ...... } // 判断包含多个线程池的数组中的各个线程池的配置是否正确 static char * ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf) { .... ngx_thread_pool_t **tpp; tpp = tcf->pools.elts; // 遍历数组中所有的线程池配置,并检查其正确性 for (i = 0; i < tcf->pools.nelts; i++) { ..... } return NGX_CONF_OK; } |
在上述的流程走完之后,nginx的master就保存了一份所有线程池的配置(tcf->pools),这份配置在创建worker时也会被继承。然后每个worker中都调用各个核心模块的init_process函数(如果有的话)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
/******************* nginx /src/core/ngx_thread_pool .c ************************/ // 创建线程池所需的基础结构 static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle) { ngx_uint_t i; ngx_thread_pool_t **tpp; ngx_thread_pool_conf_t *tcf; // 如果不是worker或者只有一个worker就不起用线程池 if (ngx_process != NGX_PROCESS_WORKER && ngx_process != NGX_PROCESS_SINGLE) { return NGX_OK; } // 初始化任务队列 ngx_thread_pool_queue_init(&ngx_thread_pool_done); tpp = tcf->pools.elts; for (i = 0; i < tcf->pools.nelts; i++) { // 初始化各个线程池 if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) { return NGX_ERROR; } } return NGX_OK; } // 线程池初始化 static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool) { ..... // 初始化任务队列 ngx_thread_pool_queue_init(&tp->queue); // 创建线程锁 if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) { return NGX_ERROR; } // 创建线程条件变量 if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) { (void) ngx_thread_mutex_destroy(&tp->mtx, log); return NGX_ERROR; } ...... for (n = 0; n < tp->threads; n++) { // 创建线程池中的每个线程 err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp); if (err) { ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_create() failed" ); return NGX_ERROR; } } ...... } // 线程池中线程处理主函数 static void *ngx_thread_pool_cycle(void *data) { ...... for ( ;; ) { // 阻塞的方式获取线程锁 if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { return NULL; } /* the number may become negative */ tp->waiting--; // 如果任务队列为空,就cond_wait阻塞等待有新任务时调用cond_signal /broadcast 触发 while (tp->queue.first == NULL) { if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log) != NGX_OK) { (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); return NULL; } } // 从任务队列中获取task,并将其从队列中移除 task = tp->queue.first; tp->queue.first = task->next; if (tp->queue.first == NULL) { tp->queue.last = &tp->queue.first; } if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) { return NULL; } ...... //task 的处理函数 task->handler(task->ctx, tp->log); ..... ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); // 将经过预处理的任务添加到 done 队列中等待调用event的回调函数继续处理 *ngx_thread_pool_done.last = task; ngx_thread_pool_done.last = &task->next; // 防止编译器优化,保证解锁操作是在上述语句执行完毕后再去执行的 ngx_memory_barrier(); ngx_unlock(&ngx_thread_pool_done_lock); (void) ngx_notify(ngx_thread_pool_handler); } } // 处理pool_done队列上task中包含的每个event事件 static void ngx_thread_pool_handler(ngx_event_t *ev) { ..... ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); // 获取任务链表的头部 task = ngx_thread_pool_done.first; ngx_thread_pool_done.first = NULL; ngx_thread_pool_done.last = &ngx_thread_pool_done.first; ngx_memory_barrier(); ngx_unlock(&ngx_thread_pool_done_lock); while (task) { ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "run completion handler for task #%ui" , task-> id ); // 遍历队列中的所有任务事件 event = &task->event; task = task->next; event->complete = 1; event->active = 0; // 调用event对应的处理函数有针对性的进行处理 event->handler(event); } } |
三、thread_pool线程池使用示例
根据之前所讲到的,nginx中的线程池主要是用于操作文件的IO操作。所以,在nginx中自带的模块ngx_http_file_cache.c文件中看到了线程池的使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
/*********************** nginx/src/os/unix/ngx_files.c **********************/ //file_cache模块的处理函数(涉及到了线程池) static ssize_t ngx_http_file_cache_aio_read(ngx_http_request_t *r, ngx_http_cache_t *c) { ....... # if (NGX_THREADS) if (clcf->aio == NGX_HTTP_AIO_THREADS) { c->file.thread_task = c->thread_task; //这里注册的函数在下面语句中的ngx_thread_read函数中被调用 c->file.thread_handler = ngx_http_cache_thread_handler; c->file.thread_ctx = r; //根据任务的属性,选择正确的线程池,并初始化task结构体中的各个成员 n = ngx_thread_read(&c->file, c->buf->pos, c->body_start, 0 , r->pool); c->thread_task = c->file.thread_task; c->reading = (n == NGX_AGAIN); return n; } #endif return ngx_read_file(&c->file, c->buf->pos, c->body_start, 0 ); } //task任务的处理函数 static ngx_int_t ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file) { ....... tp = clcf->thread_pool; ....... task->event.data = r; //注册thread_event_handler函数,该函数在处理pool_done队列中event事件时被调用 task->event.handler = ngx_http_cache_thread_event_handler; //将任务放到线程池的任务队列中 if (ngx_thread_task_post(tp, task) != NGX_OK) { return NGX_ERROR; } ...... } /*********************** nginx/src/core/ngx_thread_pool.c **********************/ //添加任务到队列中 ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task) { //如果当前的任务正在处理就退出 if (task->event.active) { ngx_log_error(NGX_LOG_ALERT, tp->log, 0 , "task #%ui already active" , task->id); return NGX_ERROR; } if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { return NGX_ERROR; } //判断当前线程池等待的任务数量与最大队列长度的关系 if (tp->waiting >= tp->max_queue) { ( void ) ngx_thread_mutex_unlock(&tp->mtx, tp->log); ngx_log_error(NGX_LOG_ERR, tp->log, 0 , "thread pool \"%V\" queue overflow: %i tasks waiting" , &tp->name, tp->waiting); return NGX_ERROR; } //激活任务 task->event.active = 1 ; task->id = ngx_thread_pool_task_id++; task->next = NULL; //通知阻塞的线程有新事件加入,可以解除阻塞 if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) { ( void ) ngx_thread_mutex_unlock(&tp->mtx, tp->log); return NGX_ERROR; } *tp->queue.last = task; tp->queue.last = &task->next; tp->waiting++; ( void ) ngx_thread_mutex_unlock(&tp->mtx, tp->log); ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0 , "task #%ui added to thread pool \"%V\"" , task->id, &tp->name); return NGX_OK; } |
上面示例基本展示了nginx目前对线程池的使用方法,采用线程池来处理IO这类慢速操作可以提升worker的主线程的执行效率。当然,用户自己在开发模块时,也可以参照file_cache模块中使用线程池的方法来调用多线程提升程序性能。(欢迎大家多多批评指正)
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
原文链接:http://www.cnblogs.com/sxhlinux/p/6906490.html