脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Python - Python 微信爬虫完整实例【单线程与多线程】

Python 微信爬虫完整实例【单线程与多线程】

2021-08-02 00:26FRESHMANS Python

这篇文章主要介绍了Python 微信爬虫,结合完整实例形式分析了Python基于单线程与多线程模式爬取微信信息相关操作技巧,需要的朋友可以参考下

本文实例讲述了Python 实现的微信爬虫。分享给大家供大家参考,具体如下:

单线程版:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import urllib.request
import urllib.parse
import urllib.error
import re,time
headers = ("User-Agent",
      "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36")
operner = urllib.request.build_opener()
operner.addheaders = [headers]
urllib.request.install_opener(operner)
list_url = []
###使用代理获取网页url内容
def use_proxy(url):
  try:
    # proxy = urllib.request.ProxyHandler({'http':proxy_addr})    ##使用代理版
    # operner = urllib.request.build_opener()
    # urllib.request.install_opener(operner)
    headers = ("User-Agent",
          "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36")
    operner = urllib.request.build_opener()
    operner.addheaders = [headers]
    urllib.request.install_opener(operner)
    data = urllib.request.urlopen(url).read().decode('utf-8')
    # print (data)
    return data
  except urllib.error.URLError as e:
    if hasattr(e, "code"):
      print(e.code)
    elif hasattr(e, "reason"):
      print(e.reason)
  except Exception as e:
    print("exception" + str(e))
    time.sleep(1)
##获取要爬取的url
def get_url(key, pagestart, pageend):
  try:
    keycode = urllib.parse.quote(key)
    for page in range(pagestart, pageend + 1):
      url = "http://weixin.sogou.com/weixin?query=%s&_sug_type_=&s_from=input&_sug_=n&type=%d&page=1&ie=utf8" % (
      keycode, page)
      data1 = use_proxy(url)
      #print("data1的内容是", data1)
      listurl_pattern = '<h3>.*?("http://.*?)</h3>'
      result = re.compile(listurl_pattern, re.S).findall(data1)
      for i in range(len(result)):
        res = result[i].replace("amp;", "").split(" ")[0].replace("\"", "")
        list_url.append(res)
    #print(list_url)
    return list_url
  except urllib.error.URLError as e:
    if hasattr(e, "code"):
      print(e.code)
    elif hasattr(e, "reason"):
      print(e.reason)
  except Exception as e:
    print("exception:", e)
##通过获取的url爬行内容数据并处理
def get_url_content(list_url):
  fh1=open("D:\\python-script\\1.html", 'wb')
  html1 = '''<!DOCTYPE html>\n<html xmlns="http://www.w3.org/1999/xhmtl">\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8">\n<title>微信文章</title></head>\n<body>'''
  fh1.write(html1.encode("utf-8"))
  fh1.close()
  fh = open("D:\\python-script\\1.html", 'ab')
  for url in list_url:
    data_content = use_proxy(url)
    #print (data_content)
    #sys.exit()
    title_pattern = '<h2.*>.*?</h2>'
    result_title = re.compile(title_pattern, re.S).findall(data_content)
    ##标题(str)
    res_title = result_title[0].replace("<h2 class=\"rich_media_title\" id=\"activity-name\">", "").replace("</h2>",
                                             "").strip()
    content_pattern = 'id="js_content">(.*?)<div class="rich_media_tool" id="js_sg_bar">'
    content = re.compile(content_pattern, re.S).findall(data_content)
    try:
      fh.write(res_title.encode("utf-8"))
      for i in content:
        fh.write(i.strip().encode("utf-8"))
    except UnicodeEncodeError as e:
      continue
  fh.write("</body></html>".encode("utf-8"))
if __name__ == '__main__':
  pagestart = 1
  pageend = 2
  key = "人工智能"
  get_url(key, pagestart, pageend)
  get_url_content(list_url)

多线程版:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import urllib.request
import urllib.parse
import urllib.error
import re,time
import queue
import threading
headers = ("User-Agent","Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36")
operner = urllib.request.build_opener()
operner.addheaders = [headers]
urllib.request.install_opener(operner)
urlque = queue.Queue()
list_url = []
###使用代理获取网页url内容
def use_proxy(url):
  try:
    # proxy = urllib.request.ProxyHandler({'http':proxy_addr})
    # operner = urllib.request.build_opener()
    # urllib.request.install_opener(operner)
    headers = ("User-Agent",
          "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36")
    operner = urllib.request.build_opener()
    operner.addheaders = [headers]
    urllib.request.install_opener(operner)
    data = urllib.request.urlopen(url).read().decode('utf-8')
    #print (data)
    return data
  except urllib.error.URLError as e:
    if hasattr(e,"code"):
      print (e.code)
    elif hasattr(e,"reason"):
      print (e.reason)
  except Exception as e:
    print ("exception"+str(e))
    time.sleep(1)
###获取文章的url连接,并将连接加入到队列
class get_url(threading.Thread):
  def __init__(self,key,pagestart,pageend,urlque):
    threading.Thread.__init__(self)
    self.pagestart = pagestart
    self.pageend = pageend
    self.key = key
    self.urlque = urlque
  def run(self):
    try:
      keycode = urllib.parse.quote(self.key)
      for page in range(self.pagestart,self.pageend+1):
        url = "http://weixin.sogou.com/weixin?query=%s&_sug_type_=&s_from=input&_sug_=n&type=%d&page=1&ie=utf8" % (keycode,page)
        data = use_proxy(url)
        print ("data1的内容是",data)
        listurl_pattern = '<h3>.*?("http://.*?)</h3>'
        result = re.compile(listurl_pattern,re.S).findall(data)
        print (result)
        if len(result) == 0:
          print ("没有可用的url")
          sys.exit()
        for i in range(len(result)):
          res = result[i].replace("amp;","").split(" ")[0].replace("\"" ,"")
          #list_url.append(res)    #加入列表
          self.urlque.put(res)      ##加入队列
          self.urlque.task_done()
      #return list_url
    except urllib.error.URLError as e:
      if hasattr(e, "code"):
        print(e.code)
      elif hasattr(e, "reason"):
        print(e.reason)
    except Exception as e:
      print ("exception:",e)
##根据url获取文章内容
class get_url_content(threading.Thread):
  def __init__(self,urlque):
    threading.Thread.__init__(self)
    self.urlque = urlque
  def run(self):
    fh1 = open("D:\\python-script\\1.html", 'wb')
    html1 = '''<!DOCTYPE html>\n<html xmlns="http://www.w3.org/1999/xhmtl">\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8">\n<title>微信文章</title></head>\n<body>'''
    fh1.write(html1.encode("utf-8"))
    fh1.close()
    fh = open("D:\\python-script\\1.html", 'ab')
    while True:
      try:
        url = self.urlque.get()
        data_content = use_proxy(url)
        title_pattern = '<h2.*>.*?</h2>'
        result_title = re.compile(title_pattern, re.S).findall(data_content)
        ##标题
        res_title = result_title[0].replace("<h2 class=\"rich_media_title\" id=\"activity-name\">", "").replace("</h2>","").strip()
        content_pattern = 'id="js_content">(.*?)<div class="rich_media_tool" id="js_sg_bar">'
        content = re.compile(content_pattern, re.S).findall(data_content)
        #c = '<p style="max-width: 100%;box-sizing: border-box;min-height: 1em;text-indent: 2em;word-wrap: break-word !important;">'
        # for i in content:
        #   ##内容
        #   c_content=i.replace(c, "").replace("<br /></p>", "").replace("</p>", "")
        fh.write(res_title.encode("utf-8"))
        for i in content:
          fh.write(i.strip().encode("utf-8"))
      except UnicodeEncodeError as e:
        continue
      fh.close()
class contrl(threading.Thread):
  def __init__(self,urlqueue):
    threading.Thread.__init__(self)
    self.urlqueue = urlqueue
    while True:
      print ("程序正在执行")
      if self.urlqueue.empty():
        time.sleep(3)
        print ("程序执行完毕")
        exit()
if __name__ == '__main__':
  pagestart = 1
  pageend = 2
  key = "人工智能"
  get_url = get_url(key,pagestart,pageend,urlque)
  get_url.start()
  get_content = get_url_content(urlque)
  get_content.start()
  cntrol = contrl(urlque)
  cntrol.start()

希望本文所述对大家Python程序设计有所帮助。

原文链接:https://www.cnblogs.com/FRESHMANS/p/8125594.html

延伸 · 阅读

精彩推荐