本文实例讲述了Python 实现的微信爬虫。分享给大家供大家参考,具体如下:
单线程版:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
import urllib.request import urllib.parse import urllib.error import re,time headers = ( "User-Agent" , "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36" ) operner = urllib.request.build_opener() operner.addheaders = [headers] urllib.request.install_opener(operner) list_url = [] ###使用代理获取网页url内容 def use_proxy(url): try : # proxy = urllib.request.ProxyHandler({'http':proxy_addr}) ##使用代理版 # operner = urllib.request.build_opener() # urllib.request.install_opener(operner) headers = ( "User-Agent" , "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36" ) operner = urllib.request.build_opener() operner.addheaders = [headers] urllib.request.install_opener(operner) data = urllib.request.urlopen(url).read().decode( 'utf-8' ) # print (data) return data except urllib.error.URLError as e: if hasattr (e, "code" ): print (e.code) elif hasattr (e, "reason" ): print (e.reason) except Exception as e: print ( "exception" + str (e)) time.sleep( 1 ) ##获取要爬取的url def get_url(key, pagestart, pageend): try : keycode = urllib.parse.quote(key) for page in range (pagestart, pageend + 1 ): url = "http://weixin.sogou.com/weixin?query=%s&_sug_type_=&s_from=input&_sug_=n&type=%d&page=1&ie=utf8" % ( keycode, page) data1 = use_proxy(url) #print("data1的内容是", data1) listurl_pattern = '<h3>.*?("http://.*?)</h3>' result = re. compile (listurl_pattern, re.S).findall(data1) for i in range ( len (result)): res = result[i].replace( "amp;" , " ").split(" ")[0].replace(" \" ", " ") list_url.append(res) #print(list_url) return list_url except urllib.error.URLError as e: if hasattr (e, "code" ): print (e.code) elif hasattr (e, "reason" ): print (e.reason) except Exception as e: print ( "exception:" , e) ##通过获取的url爬行内容数据并处理 def get_url_content(list_url): fh1 = open ( "D:\\python-script\\1.html" , 'wb' ) html1 = '''<!DOCTYPE html>\n<html xmlns="http://www.w3.org/1999/xhmtl">\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8">\n<title>微信文章</title></head>\n<body>''' fh1.write(html1.encode( "utf-8" )) fh1.close() fh = open ( "D:\\python-script\\1.html" , 'ab' ) for url in list_url: data_content = use_proxy(url) #print (data_content) #sys.exit() title_pattern = '<h2.*>.*?</h2>' result_title = re. compile (title_pattern, re.S).findall(data_content) ##标题(str) res_title = result_title[ 0 ].replace( "<h2 class=\"rich_media_title\" id=\"activity-name\">" , " ").replace(" < / h2>", "").strip() content_pattern = 'id="js_content">(.*?)<div class="rich_media_tool" id="js_sg_bar">' content = re. compile (content_pattern, re.S).findall(data_content) try : fh.write(res_title.encode( "utf-8" )) for i in content: fh.write(i.strip().encode( "utf-8" )) except UnicodeEncodeError as e: continue fh.write( "</body></html>" .encode( "utf-8" )) if __name__ = = '__main__' : pagestart = 1 pageend = 2 key = "人工智能" get_url(key, pagestart, pageend) get_url_content(list_url) |
多线程版:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import urllib.request import urllib.parse import urllib.error import re,time import queue import threading headers = ( "User-Agent" , "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36" ) operner = urllib.request.build_opener() operner.addheaders = [headers] urllib.request.install_opener(operner) urlque = queue.Queue() list_url = [] ###使用代理获取网页url内容 def use_proxy(url): try : # proxy = urllib.request.ProxyHandler({'http':proxy_addr}) # operner = urllib.request.build_opener() # urllib.request.install_opener(operner) headers = ( "User-Agent" , "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36" ) operner = urllib.request.build_opener() operner.addheaders = [headers] urllib.request.install_opener(operner) data = urllib.request.urlopen(url).read().decode( 'utf-8' ) #print (data) return data except urllib.error.URLError as e: if hasattr (e, "code" ): print (e.code) elif hasattr (e, "reason" ): print (e.reason) except Exception as e: print ( "exception" + str (e)) time.sleep( 1 ) ###获取文章的url连接,并将连接加入到队列 class get_url(threading.Thread): def __init__( self ,key,pagestart,pageend,urlque): threading.Thread.__init__( self ) self .pagestart = pagestart self .pageend = pageend self .key = key self .urlque = urlque def run( self ): try : keycode = urllib.parse.quote( self .key) for page in range ( self .pagestart, self .pageend + 1 ): url = "http://weixin.sogou.com/weixin?query=%s&_sug_type_=&s_from=input&_sug_=n&type=%d&page=1&ie=utf8" % (keycode,page) data = use_proxy(url) print ( "data1的内容是" ,data) listurl_pattern = '<h3>.*?("http://.*?)</h3>' result = re. compile (listurl_pattern,re.S).findall(data) print (result) if len (result) = = 0 : print ( "没有可用的url" ) sys.exit() for i in range ( len (result)): res = result[i].replace( "amp;" ," ").split(" ")[0].replace(" \" " ," ") #list_url.append(res) #加入列表 self .urlque.put(res) ##加入队列 self .urlque.task_done() #return list_url except urllib.error.URLError as e: if hasattr (e, "code" ): print (e.code) elif hasattr (e, "reason" ): print (e.reason) except Exception as e: print ( "exception:" ,e) ##根据url获取文章内容 class get_url_content(threading.Thread): def __init__( self ,urlque): threading.Thread.__init__( self ) self .urlque = urlque def run( self ): fh1 = open ( "D:\\python-script\\1.html" , 'wb' ) html1 = '''<!DOCTYPE html>\n<html xmlns="http://www.w3.org/1999/xhmtl">\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8">\n<title>微信文章</title></head>\n<body>''' fh1.write(html1.encode( "utf-8" )) fh1.close() fh = open ( "D:\\python-script\\1.html" , 'ab' ) while True : try : url = self .urlque.get() data_content = use_proxy(url) title_pattern = '<h2.*>.*?</h2>' result_title = re. compile (title_pattern, re.S).findall(data_content) ##标题 res_title = result_title[ 0 ].replace( "<h2 class=\"rich_media_title\" id=\"activity-name\">" , " ").replace(" < / h2> "," ").strip() content_pattern = 'id="js_content">(.*?)<div class="rich_media_tool" id="js_sg_bar">' content = re. compile (content_pattern, re.S).findall(data_content) #c = '<p style="max-width: 100%;box-sizing: border-box;min-height: 1em;text-indent: 2em;word-wrap: break-word !important;">' # for i in content: # ##内容 # c_content=i.replace(c, "").replace("<br /></p>", "").replace("</p>", "") fh.write(res_title.encode( "utf-8" )) for i in content: fh.write(i.strip().encode( "utf-8" )) except UnicodeEncodeError as e: continue fh.close() class contrl(threading.Thread): def __init__( self ,urlqueue): threading.Thread.__init__( self ) self .urlqueue = urlqueue while True : print ( "程序正在执行" ) if self .urlqueue.empty(): time.sleep( 3 ) print ( "程序执行完毕" ) exit() if __name__ = = '__main__' : pagestart = 1 pageend = 2 key = "人工智能" get_url = get_url(key,pagestart,pageend,urlque) get_url.start() get_content = get_url_content(urlque) get_content.start() cntrol = contrl(urlque) cntrol.start() |
希望本文所述对大家Python程序设计有所帮助。
原文链接:https://www.cnblogs.com/FRESHMANS/p/8125594.html