本文实例讲述了Python自定义线程池实现方法。分享给大家供大家参考,具体如下:
关于python的多线程,由与GIL的存在被广大群主所诟病,说python的多线程不是真正的多线程。但多线程处理IO密集的任务效率还是可以杠杠的。
我实现的这个线程池其实是根据银角的思路来实现的。
主要思路:
任务获取和执行:
1、任务加入队列,等待线程来获取并执行。
2、按需生成线程,每个线程循环取任务。
线程销毁:
1、获取任务是终止符时,线程停止。
2、线程池close()时,向任务队列加入和已生成线程等量的终止符。
3、线程池terminate()时,设置线程下次任务取到为终止符。
流程概要设计:
详细代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
import threading import contextlib from Queue import Queue import time class ThreadPool( object ): def __init__( self , max_num): self .StopEvent = 0 #线程任务终止符,当线程从队列获取到StopEvent时,代表此线程可以销毁。可设置为任意与任务有区别的值。 self .q = Queue() self .max_num = max_num #最大线程数 self .terminal = False #是否设置线程池强制终止 self .created_list = [] #已创建线程的线程列表 self .free_list = [] #空闲线程的线程列表 self .Deamon = False #线程是否是后台线程 def run( self , func, args, callback = None ): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: :return: 如果线程池已经终止,则返回True否则None """ if len ( self .free_list) = = 0 and len ( self .created_list) < self .max_num: self .create_thread() task = (func, args, callback,) self .q.put(task) def create_thread( self ): """ 创建一个线程 """ t = threading.Thread(target = self .call) t.setDaemon( self .Deamon) t.start() self .created_list.append(t) #将当前线程加入已创建线程列表created_list def call( self ): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.current_thread() #获取当前线程对象· event = self .q.get() #从任务队列获取任务 while event ! = self .StopEvent: #判断获取到的任务是否是终止符 func, arguments, callback = event #从任务中获取函数名、参数、和回调函数名 try : result = func( * arguments) func_excute_status = True #func执行成功状态 except Exception as e: func_excute_status = False result = None print '函数执行产生错误' , e #打印错误信息 if func_excute_status: #func执行成功后才能执行回调函数 if callback is not None : #判断回调函数是否是空的 try : callback(result) except Exception as e: print '回调函数执行产生错误' , e # 打印错误信息 with self .worker_state( self .free_list,current_thread): #执行完一次任务后,将线程加入空闲列表。然后继续去取任务,如果取到任务就将线程从空闲列表移除 if self .terminal: #判断线程池终止命令,如果需要终止,则使下次取到的任务为StopEvent。 event = self .StopEvent else : #否则继续获取任务 event = self .q.get() # 当线程等待任务时,q.get()方法阻塞住线程,使其持续等待 else : #若线程取到的任务是终止符,就销毁线程 #将当前线程从已创建线程列表created_list移除 self .created_list.remove(current_thread) def close( self ): """ 执行完所有的任务后,所有线程停止 """ full_size = len ( self .created_list) #按已创建的线程数量往线程队列加入终止符。 while full_size: self .q.put( self .StopEvent) full_size - = 1 def terminate( self ): """ 无论是否还有任务,终止线程 """ self .terminal = True while self .created_list: self .q.put( self .StopEvent) self .q.queue.clear() #清空任务队列 def join( self ): """ 阻塞线程池上下文,使所有线程执行完后才能继续 """ for t in self .created_list: t.join() @contextlib .contextmanager #上下文处理器,使其可以使用with语句修饰 def worker_state( self , state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try : yield finally : state_list.remove(worker_thread) if __name__ = = '__main__' : def Foo(arg): return arg # time.sleep(0.1) def Bar(res): print res pool = ThreadPool( 5 ) # pool.Deamon=True#需在pool.run之前设置 for i in range ( 1000 ): pool.run(func = Foo,args = (i,),callback = Bar) pool.close() pool.join() # pool.terminate() print "任务队列里任务数%s" % pool.q.qsize() print "当前存活子线程数量:%d" % threading.activeCount() print "当前线程创建列表:%s" % pool.created_list print "当前线程创建列表:%s" % pool.free_list |
关于上下文处理:
来个简单例子说明:
下面的代码手动自定义了一个myopen方法,模拟我们常见的with open() as f:语句。具体的contextlib模块使用,会单独开章来将。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# coding:utf-8 import contextlib @contextlib .contextmanager #定义该函数支持上下文with语句 def myopen(filename,mode): f = open (filename,mode) try : yield f.readlines() #正常执行返回f.readlines() except Exception as e: print e finally : f.close() #最后在with代码快执行完毕后返回执行finally下的f.close()实现关闭文件 if __name__ = = '__main__' : with myopen(r 'c:\ip1.txt' , 'r' ) as f: for line in f: print line |
希望本文所述对大家Python程序设计有所帮助。
原文链接:http://www.cnblogs.com/tkqasn/p/5711593.html