本文为大家分享了python实现的一个多线程网页下载器,供大家参考,具体内容如下
这是一个有着真实需求的实现,我的用途是拿它来通过 HTTP 方式向服务器提交游戏数据。把它放上来也是想大家帮忙挑刺,找找 bug,让它工作得更好。
keywords:python,http,multi-threads,thread,threading,httplib,urllib,urllib2,Queue,http pool,httppool
废话少说,上源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
# -*- coding:utf-8 -*- import urllib, httplib import thread import time from Queue import Queue, Empty, Full HEADERS = { "Content-type" : "application/x-www-form-urlencoded" , 'Accept-Language' : 'zh-cn' , 'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 6.0;Windows NT 5.0)' , "Accept" : "text/plain" } UNEXPECTED_ERROR = - 1 POST = 'POST' GET = 'GET' def base_log(msg): print msg def base_fail_op(task, status, log): log( 'fail op. task = %s, status = %d' % ( str (task), status)) def get_remote_data(tasks, results, fail_op = base_fail_op, log = base_log): while True : task = tasks.get() try : tid = task[ 'id' ] hpt = task[ 'conn_args' ] # hpt <= host:port, timeout except KeyError, e: log( str (e)) continue log( 'thread_%s doing task %d' % (thread.get_ident(), tid)) #log('hpt = ' + str(hpt)) conn = httplib.HTTPConnection( * * hpt) try : params = task[ 'params' ] except KeyError, e: params = {} params = urllib.urlencode(params) #log('params = ' + params) try : method = task[ 'method' ] except KeyError: method = 'GET' #log('method = ' + method) try : url = task[ 'url' ] except KeyError: url = '/' #log('url = ' + url) headers = HEADERS try : tmp = task[ 'headers' ] except KeyError, e: tmp = {} headers.update(tmp) #log('headers = ' + str(headers)) headers[ 'Content-Length' ] = len (params) try : if method = = POST: conn.request(method, url, params, headers) else : conn.request(method, url + params) response = conn.getresponse() except Exception, e: log( 'request failed. method = %s, url = %s, params = %s headers = %s' % ( method, url, params, headers)) log( str (e)) fail_op(task, UNEXPECTED_ERROR, log) continue if response.status ! = httplib.OK: fail_op(task, response.status, log) continue data = response.read() results.put((tid, data), True ) class HttpPool( object ): def __init__( self , threads_count, fail_op, log): self ._tasks = Queue() self ._results = Queue() for i in xrange (threads_count): thread.start_new_thread(get_remote_data, ( self ._tasks, self ._results, fail_op, log)) def add_task( self , tid, host, url, params, headers = {}, method = 'GET' , timeout = None ): task = { 'id' : tid, 'conn_args' : { 'host' : host} if timeout is None else { 'host' : host, 'timeout' : timeout}, 'headers' : headers, 'url' : url, 'params' : params, 'method' : method, } try : self ._tasks.put_nowait(task) except Full: return False return True def get_results( self ): results = [] while True : try : res = self ._results.get_nowait() except Empty: break results.append(res) return results def test_google(task_count, threads_count): hp = HttpPool(threads_count, base_fail_op, base_log) for i in xrange (task_count): if hp.add_task(i, 'www.google.cn' , '/search?' , { 'q' : 'lai' }, # method = 'POST' ): print 'add task successed.' while True : results = hp.get_results() if not results: time.sleep( 1.0 * random.random()) for i in results: print i[ 0 ], len (i[ 1 ]) # print unicode(i[1], 'gb18030') if __name__ = = '__main__' : import sys, random task_count, threads_count = int (sys.argv[ 1 ]), int (sys.argv[ 2 ]) test_google(task_count, threads_count) |
有兴趣想尝试运行的朋友,可以把它保存为 xxxx.py,然后执行 python xxxx.py 10 4,其中 10 表示向 google.cn 请求 10 次查询,4 表示由 4 条线程来执行这些任务。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/gzlaiyonghao/article/details/4083852