一、简介
多线程编程技术可以实现代码并行性,优化处理能力,同时功能的更小划分可以使代码的可重用性更好。Python中threading和Queue模块可以用来实现多线程编程。
二、详解
1、线程和进程
进程(有时被称为重量级进程)是程序的一次执行。每个进程都有自己的地址空间、内存、数据栈以及其它记录其运行轨迹的辅助数据。操作系统管理在其上运行的所有进程,并为这些进程公平地分配时间。进程也可以通过fork和spawn操作来完成其它的任务,不过各个进程有自己的内存空间、数据栈等,所以只能使用进程间通讯(IPC),而不能直接共享信息。
线程(有时被称为轻量级进程)跟进程有些相似,不同的是所有的线程运行在同一个进程中,共享相同的运行环境。它们可以想像成是在主进程或“主线程”中并行运行的“迷你进程”。线程有开始、顺序执行和结束三部分,它有一个自己的指令指针,记录自己运行到什么地方。线程的运行可能被抢占(中断)或暂时的被挂起(也叫睡眠)让其它的线程运行,这叫做让步。一个进程中的各个线程之间共享同一片数据空间,所以线程之间可以比进程之间更方便地共享数据以及相互通讯。线程一般都是并发执行的,正是由于这种并行和数据共享的机制使得多个任务的合作变为可能。实际上,在单CPU的系统中,真正的并发是不可能的,每个线程会被安排成每次只运行一小会,然后就把CPU让出来,让其它的线程去运行。在进程的整个运行过程中,每个线程都只做自己的事,在需要的时候跟其它的线程共享运行的结果。多个线程共同访问同一片数据不是完全没有危险的,由于数据访问的顺序不一样,有可能导致数据结果的不一致的问题,这叫做竞态条件。而大多数线程库都带有一系列的同步原语,来控制线程的执行和数据的访问。
2、使用线程
(1)全局解释器锁(GIL)
Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。
对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。在多线程环境中,Python 虚拟机按以下方式执行:a、设置 GIL;b、切换到一个线程去运行;c、运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));d、把线程设置为睡眠状态;e、解锁 GIL;d、再次重复以上所有步骤。
在调用外部代码(如 C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)编写扩展的程序员可以主动解锁GIL。
(2)退出线程
当一个线程结束计算,它就退出了。线程可以调用thread.exit()之类的退出函数,也可以使用Python退出进程的标准方法,如sys.exit()或抛出一个SystemExit异常等。不过,不可以直接“杀掉”("kill")一个线程。
不建议使用thread模块,很明显的一个原因是,当主线程退出的时候,所有其它线程没有被清除就退出了。另一个模块threading就能确保所有“重要的”子线程都退出后,进程才会结束。
(3)Python的线程模块
Python提供了几个用于多线程编程的模块,包括thread、threading和Queue等。thread和threading模块允许程序员创建和管理线程。thread模块提供了基本的线程和锁的支持,threading提供了更高级别、功能更强的线程管理的功能。Queue模块允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。
避免使用thread模块,因为更高级别的threading模块更为先进,对线程的支持更为完善,而且使用thread模块里的属性有可能会与threading出现冲突;其次低级别的thread模块的同步原语很少(实际上只有一个),而threading模块则有很多;再者,thread模块中当主线程结束时,所有的线程都会被强制结束掉,没有警告也不会有正常的清除工作,至少threading模块能确保重要的子线程退出后进程才退出。
3、thread模块
thread模块除了产生线程外,thread模块也提供了基本的同步数据结构锁对象(lock object也叫原语锁、简单锁、互斥锁、互斥量、二值信号量)。同步原语与线程的管理是密不可分的。
常用的线程函数以及LockType类型的锁对象的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
#!/usr/bin/env python import thread from time import sleep, ctime def loop0(): print '+++start loop 0 at:' , ctime() sleep( 4 ) print '+++loop 0 done at:' , ctime() def loop1(): print '***start loop 1 at:' , ctime() sleep( 2 ) print '***loop 1 done at:' , ctime() def main(): print '------starting at:' , ctime() thread.start_new_thread(loop0, ()) thread.start_new_thread(loop1, ()) sleep( 6 ) print '------all DONE at:' , ctime() if __name__ = = '__main__' : main() |
thread 模块提供的简单的多线程的机制,两个循环并发地被执行,总的运行时间为最慢的那个线程的运行时间(主线程6s),而不是所有的线程的运行时间之和。start_new_thread()要求要有前两个参数,就算想要运行的函数不要参数,也要传一个空的元组。
sleep(6)是让主线程停下来,主线程一旦运行结束,就关闭运行着其他两个线程。但这可能造成主线程过早或过晚退出,那就要使用线程锁,可以在两个子线程都退出后,主线程立即退出。
在CODE上查看代码片派生到我的代码片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
#!/usr/bin/env python import thread from time import sleep, ctime loops = [ 4 , 2 ] def loop(nloop, nsec, lock): print '+++start loop:' , nloop, 'at:' , ctime() sleep(nsec) print '+++loop:' , nloop, 'done at:' , ctime() lock.release() def main(): print '---starting threads...' locks = [] nloops = range ( len (loops)) for i in nloops: lock = thread.allocate_lock() lock.acquire() locks.append(lock) for i in nloops: thread.start_new_thread(loop, (i, loops[i], locks[i])) for i in nloops: while locks[i].locked(): pass print '---all DONE at:' , ctime() if __name__ = = '__main__' : main() |
4、threading模块
更高级别的threading模块,它不仅提供了Thread类,还提供了各种非常好用的同步机制。threading 模块里所有的对象:
thread模块不支持守护线程,当主线程退出时,所有的子线程不论它们是否还在工作,都会被强行退出。而threading模块支持守护线程,守护线程一般是一个等待客户请求的服务器,如果没有客户提出请求它就在那等着,如果设定一个线程为守护线程,就表示这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。如果主线程退出不用等待那些子线程完成,那就设定这些线程的daemon属性,即在线程thread.start()开始前,调用setDaemon()函数设定线程的daemon标志(thread.setDaemon(True))就表示这个线程“不重要”。如果想要等待子线程完成再退出,那就什么都不用做或者显式地调用thread.setDaemon(False)以保证其daemon标志为False,可以调用thread.isDaemon()函数来判断其daemon标志的值。新的子线程会继承其父线程的daemon标志,整个Python会在所有的非守护线程退出后才会结束,即进程中没有非守护线程存在的时候才结束。
(1)threading的Thread类
它有很多thread模块里没有的函数,Thread对象的函数:
创建一个Thread的实例,传给它一个函数
在CODE上查看代码片派生到我的代码片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
#!/usr/bin/env python import threading from time import sleep, ctime loops = [ 4 , 2 ] def loop(nloop, nsec): print '+++start loop:' , nloop, 'at:' , ctime() sleep(nsec) print '+++loop:' , nloop, 'done at:' , ctime() def main(): print '---starting at:' , ctime() threads = [] nloops = range ( len (loops)) for i in nloops: t = threading.Thread(target = loop, args = (i, loops[i])) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all threads[i].join() # threads to finish print '---all DONE at:' , ctime() if __name__ = = '__main__' : main() |
实例化一个Thread(调用 Thread())与调用thread.start_new_thread()之间最大的区别就是,新的线程不会立即开始。在创建线程对象,但不想马上开始运行线程的时候,这是一个很有用的同步特性。所有的线程都创建了之后,再一起调用 start()函数启动,而不是创建一个启动一个。而且也不用再管理一堆锁(分配锁、获得锁、释放锁、检查锁的状态等),只要简单地对每个线程调用join()主线程等待子线程的结束即可。join()还可以设置timeout的参数,即主线程等到超时为止。
join()的另一个比较重要的方面是它可以完全不用调用,一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。如果主线程除了等线程结束外,还有其它的事情要做,那就不用调用 join(),只有在等待线程结束的时候才调用join()。
创建一个Thread的实例,传给它一个可调用的类对象
[html] view plaincopy在CODE上查看代码片派生到我的代码片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
#!/usr/bin/env python import threading from time import sleep, ctime loops = [ 4 , 2 ] class ThreadFunc( object ): def __init__( self , func, args, name = ''): self .name = name self .func = func self .args = args def __call__( self ): apply ( self .func, self .args) def loop(nloop, nsec): print 'start loop' , nloop, 'at:' , ctime() sleep(nsec) print 'loop' , nloop, 'done at:' , ctime() def main(): print 'starting at:' , ctime() threads = [] nloops = range ( len (loops)) for i in nloops: # create all threads t = threading.Thread(target = ThreadFunc(loop, (i, loops[i]), loop.__name__)) threads.append(t) for i in nloops: # start all threads threads[i].start() for i in nloops: # wait for completion threads[i].join() print 'all DONE at:' , ctime() if __name__ = = '__main__' : main() |
与传一个函数很相似的另一个方法是在创建线程的时候,传一个可调用的类的实例供线程启动的时候执行,这是多线程编程的一个更为面向对象的方法。相对于一个或几个函数来说,类对象里可以使用类的强大的功能。创建新线程的时候,Thread对象会调用ThreadFunc对象,这时会用到一个特殊函数__call__()。由于已经有了要用的参数,所以就不用再传到Thread()的构造函数中。由于有一个参数的元组,这时要使用apply()函数或使用self.res = self.func(*self.args)。
从Thread派生出一个子类,创建一个这个子类的实例
在CODE上查看代码片派生到我的代码片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
#!/usr/bin/env python import threading from time import sleep, ctime loops = [ 4 , 2 ] class MyThread(threading.Thread): def __init__( self , func, args, name = ''): threading.Thread.__init__( self ) self .name = name self .func = func self .args = args def getResult( self ): return self .res def run( self ): print 'starting' , self .name, 'at:' , ctime() self .res = apply ( self .func, self .args) print self .name, 'finished at:' , ctime() def loop(nloop, nsec): print 'start loop' , nloop, 'at:' , ctime() sleep(nsec) print 'loop' , nloop, 'done at:' , ctime() def main(): print 'starting at:' , ctime() threads = [] nloops = range ( len (loops)) for i in nloops: t = MyThread(loop, (i, loops[i]), loop.__name__) threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join() print 'all DONE at:' , ctime() if __name__ = = '__main__' : main() |
子类化Thread类,MyThread子类的构造函数一定要先调用基类的构造函数,特殊函数__call__()在子类中,名字要改为run()。在 MyThread类中,加入一些用于调试的输出信息,把代码保存到myThread模块中,并导入这个类。除使用apply()函数来运行这些函数之外,还可以把结果保存到实现的self.res属性中,并创建一个新的函数getResult()来得到结果。
(2)threading模块中的其它函数
5、Queue模块
常用的 Queue 模块的属性:
Queue模块可以用来进行线程间通讯,让各个线程之间共享数据。Queue解决生产者-消费者的问题,现在创建一个队列,让生产者线程把新生产的货物放进去供消费者线程使用。生产者生产货物所要花费的时间无法预先确定,消费者消耗生产者生产的货物的时间也是不确定的。
在CODE上查看代码片派生到我的代码片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
#!/usr/bin/env python from random import randint from time import sleep from Queue import Queue from myThread import MyThread def writeQ(queue): print '+++producing object for Q...' , queue.put( 'xxx' , 1 ) print "+++size now:" , queue.qsize() def readQ(queue): val = queue.get( 1 ) print '---consumed object from Q... size now' , \ queue.qsize() def writer(queue, loops): for i in range (loops): writeQ(queue) sleep(randint( 1 , 3 )) def reader(queue, loops): for i in range (loops): readQ(queue) sleep(randint( 2 , 5 )) funcs = [writer, reader] nfuncs = range ( len (funcs)) def main(): nloops = randint( 2 , 5 ) q = Queue( 32 ) threads = [] for i in nfuncs: t = MyThread(funcs[i], (q, nloops), \ funcs[i].__name__) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() print '***all DONE' if __name__ = = '__main__' : main() |
这个实现中使用了Queue对象和随机地生产(和消耗)货物的方式。生产者和消费者相互独立并且并发地运行,它们不一定是轮流执行的(随机数模拟)。writeQ()和readQ()函数分别用来把对象放入队列和消耗队列中的一个对象,在这里使用字符串'xxx'来表示队列中的对象。writer()函数就是一次往队列中放入一个对象,等待一会然后再做同样的事,一共做指定的次数,这个次数是由脚本运行时随机生成的。reader()函数做的事比较类似,只是它是用来消耗对象的。
6、线程相关模块
多线程相关的标准库模块:
三、总结
(1)一个要完成多项任务的程序,可以考虑每个任务使用一个线程,这样的程序在设计上相对于单线程做所有事的程序来说,更为清晰明了。
(2)单线程的程序在程序性能上的限制,尤其在有相互独立、运行时间不确定、多个任务的程序里,而把多个任务分隔成多个线程同时运行会比顺序运行速度更快。由于Python解释器是单线程的,所以不是所有的程序都能从多线程中得到好处。
(3)若有不足,请留言,在此先感谢!