本文实例讲述了Python多线程结合队列下载百度音乐的方法。分享给大家供大家参考。具体如下:
一直想做个下载音乐的脚本,后来决定就拿百度音乐开刀,经过多次分析,终于制作了一个下载百度音乐的脚本,目前只默认下载第一页,童鞋们可以自由拓展。
适用Windows和Linux平台、依赖BeautifulSoup这个库,主要对HTML进行解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
#!/usr/bin/python # -*- coding: utf-8 -*- ''' 百度中批量下载某歌手的歌(目前只下载第一页,可以自行拓展) @author:admin @qq: 1243385033 ''' import threading, urllib2, os,re,sys from bs4 import BeautifulSoup from Queue import Queue '''目标歌手''' SINGER = u '亚东' '''保存路径''' SAVE_FOLDER = 'F:/music/' # 查询url search_url = "http://music.baidu.com/search/song?key=%s&s=1" # 百度音乐播放盒url song_url = "http://box.zhangmen.baidu.com/x?op=12&count=1&mtype=1&title=" class Downloader(threading.Thread): def __init__( self , task): threading.Thread.__init__( self ) self .task = task def run( self ): '''覆盖父类的run方法''' while True : url = self .task.get() self .download(url) self .task.task_done() def build_path( self , filename): join = os.path.join parentPath = join(SAVE_FOLDER,SINGER) filename = filename + '.mp3' myPath = join(parentPath, filename) return myPath def download( self , url): '''下载文件''' sub_url = url.items() f_name = sub_url[ 0 ][ 0 ] req_url = sub_url[ 0 ][ 1 ] handle = urllib2.urlopen(req_url) # 保存路径 save_path = self .build_path(f_name) with open (save_path, "wb" ) as handler: while True : chunk = handle.read( 1024 ) if not chunk: break handler.write(chunk) msg = u "已经从 %s下载完成" % req_url sys.stdout.write(msg) sys.stdout.flush() class HttpRequest: def __init__( self ): self .task = [] self .reg_decode = re. compile ( '<decode>.*?CDATA\[(.*?)\]].*?</decode>' ) self .reg_encode = re. compile ( '<encode>.*?CDATA\[(.*?)\]].*?</encode>' ) self .init() self .target_url = search_url % urllib2.quote( self .encode2utf8(SINGER)) def encode2utf8( self ,source): if source and isinstance (source,( str , unicode )): source = source.encode( "utf8" ) return source return source def mkDir( self , dir_name): if not os.path.exists(dir_name): os.mkdir(dir_name) def init( self ): self .mkDir(SAVE_FOLDER) subPath = os.path.join(SAVE_FOLDER, SINGER) self .mkDir(subPath) def http_request( self ): global song_url '''发起请求''' response = urllib2.urlopen( self .target_url) # 获取头信息 content = response.read() response.close() # 使用BeautifulSoup html = BeautifulSoup(content, from_encoding = "utf8" ) # 提取HTML标签 span_tag = html.find_all( 'div' , { "monkey" : "song-list" })[ 0 ].find_all( 'span' , class_ = 'song-title' ) # 遍历List for a_tag in span_tag: song_name = unicode (a_tag.find_all( "a" )[ 0 ].get_text()) song_url = song_url + urllib2.quote( self .encode2utf8(song_name)) song_url = song_url + '$$' + urllib2.quote( self .encode2utf8(SINGER)) + '$$$$&url=&listenreelect=0&.r=0.1696378872729838' xmlfile = urllib2.urlopen(song_url) xml_content = xmlfile.read() xmlfile.close() url1 = re.findall( self .reg_encode, xml_content) url2 = re.findall( self .reg_decode, xml_content) if not url1 or not url2: continue url = url1[ 0 ][:url1[ 0 ].rindex( '/' ) + 1 ] + url2[ 0 ] self .task.append({song_name:url}) return self .task def start_download(urls): #创建一个队列 quene = Queue() #获取list的大小 size = len (urls) #开启线程 for _ in xrange (size): t = Downloader(quene) t.setDaemon( True ) t.start() #入队列 for url in urls: quene.put(url) quene.join() if __name__ = = '__main__' : http = HttpRequest() urls = http.http_request() start_download(urls) |
希望本文所述对大家的Python程序设计有所帮助。