首先来看实例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# -*- coding:utf-8 -*- import requests import datetime import time import threading ''' allow_redirects = False禁止重定向,添加在request参数后 get请求用params传参 post请求,数据类型form,用data传参 post请求,数据类型form,用data传参 post请求,数据类型json,json传参 timeout:请求超时时间,添加在request参数后 nub = 10#设置并发线程数 ResponseTime=float(result.elapsed.microseconds)/1000 #获取响应时间,单位ms ThinkTime = 0.5#设置思考时间 AverageTime = "{:.3f}".format(float(sum(myrequest.times))/float(len(myrequest.times)))#计算数组的平均值,保留3位小数 totaltime = float(hour)*60*60 + float(minute)*60 + float(second) #计算总的思考时间+请求时间 ''' class url_request: times = [] error = [] def weather_DC( self ): myrequest = url_request() weatherinfo_search = 'https://restapi.amap.com/v3/weather/weatherInfo?parameters' params = { 'key' : 'cd1b11e80ffac05253196aa2a1233f25' , 'city' : 110101 , 'extensions' : 'base' , 'output' : 'JSON' } result = requests.get(url = weatherinfo_search, params = params) print ( "状态码:" ,result.status_code) print ( "返回报文:" ,result.text) ResponseTime = float (result.elapsed.microseconds) / 1000 myrequest.times.append(ResponseTime) if result.status_code ! = 200 : myrequest.error.append( "0" ) if __name__ = = '__main__' : myrequest = url_request() threads = [] starttime = datetime.datetime.now() print ( "请求开始时间:request start time %s" % starttime) nub = 10 ThinkTime = 0.5 for i in range ( 1 , nub + 1 ): t = threading.Thread(target = myrequest.weather_DC()) threads.append(t) for t in threads: time.sleep(ThinkTime) print ( "线程数:thread %s" % t) t.setDaemon( True ) t.start() t.join() endtime = datetime.datetime.now() print ( "请求结束时间:request end time %s" % endtime) time.sleep( 3 ) AverageTime = "{:.3f}" . format ( float ( sum (myrequest.times)) / float ( len (myrequest.times))) print ( "平均响应时间:Average Response Time %s ms" % AverageTime) usetime = str (endtime - starttime) hour = usetime.split( ':' ).pop( 0 ) minute = usetime.split( ':' ).pop( 1 ) second = usetime.split( ':' ).pop( 2 ) totaltime = float (hour) * 60 * 60 + float (minute) * 60 + float (second) print ( "并发数:Concurrent processing %s" % nub) print ( "#总共消耗的时间:use total time %s s" % (totaltime - float (nub * ThinkTime))) print ( "错误请求数:fail request %s s" % myrequest.error.count( "0" )) |
实例扩展:
利用ruquest发送请求,利用多线程模拟并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
#!/user/bin/env python #coding=utf-8 import requests import datetime import time import threading class url_request(): times = [] error = [] def req( self ,AppID,url): myreq = url_request() headers = { 'User-Agent' : 'Mozilla/5.0 (Linux; Android 4.2.1; en-us; Nexus 4 Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19' } payload = { 'AppID' :AppID, 'CurrentURL' :url} r = requests.post( "http://xx.xxx.com/WeiXinJSAccessToken/json/WeChatJSTicket" ,headers = headers,data = payload) ResponseTime = float (r.elapsed.microseconds) / 1000 #获取响应时间,单位ms myreq.times.append(ResponseTime) #将响应时间写入数组 if r.status_code ! = 200 : myreq.error.append( "0" ) if __name__ = = '__main__' : myreq = url_request() threads = [] starttime = datetime.datetime.now() print "request start time %s" % starttime nub = 50 #设置并发线程数 ThinkTime = 0.5 #设置思考时间 for i in range ( 1 , nub + 1 ): t = threading.Thread(target = myreq.req, args = ( '12' , 'http://m.ctrip.com/webapp/cpage/#mypoints' )) threads.append(t) for t in threads: time.sleep(ThinkTime) #print "thread %s" %t #打印线程 t.setDaemon( True ) t.start() t.join() endtime = datetime.datetime.now() print "request end time %s" % endtime time.sleep( 3 ) AverageTime = "{:.3f}" . format ( float ( sum (myreq.times)) / float ( len (myreq.times))) #计算数组的平均值,保留3位小数 print "Average Response Time %s ms" % AverageTime #打印平均响应时间 usetime = str (endtime - starttime) hour = usetime.split( ':' ).pop( 0 ) minute = usetime.split( ':' ).pop( 1 ) second = usetime.split( ':' ).pop( 2 ) totaltime = float (hour) * 60 * 60 + float (minute) * 60 + float (second) #计算总的思考时间+请求时间 print "Concurrent processing %s" % nub #打印并发数 print "use total time %s s" % (totaltime - float (nub * ThinkTime)) #打印总共消耗的时间 print "fail request %s" % myreq.error.count( "0" ) #打印错误请求数 |
1
2
3
4
5
6
|
request start time 2015 - 02 - 10 18 : 24 : 14.316000 request end time 2015 - 02 - 10 18 : 24 : 39.769000 Average Response Time 46.700 ms Concurrent processing 50 use total time 25.453 s fail request 1 |
到此这篇关于Python3接口性能测试实例代码的文章就介绍到这了,更多相关Python3实现简单的接口性能测试内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://www.cnblogs.com/jiaown123/p/14902136.html