脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Python - Python reques接口测试框架实现代码

Python reques接口测试框架实现代码

2020-07-28 12:02安琪儿一直在 Python

这篇文章主要介绍了Python reques接口测试框架实现代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

一、框架菜单

Python reques接口测试框架实现代码

1.1 common模块

Python reques接口测试框架实现代码

1.2 其他

Python reques接口测试框架实现代码

二、Excel接口测试案例编写

Python reques接口测试框架实现代码

Python reques接口测试框架实现代码

三、读取Excel测试封装(核心封装)

excel_utils.py 读取Excel中的数据

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import os
import xlrd  #内置模块、第三方模块pip install 自定义模块
 
 
class ExcelUtils():
  def __init__(self,file_path,sheet_name):
    self.file_path = file_path
    self.sheet_name = sheet_name
    self.sheet = self.get_sheet() # 整个表格对象
 
  def get_sheet(self):
    wb = xlrd.open_workbook(self.file_path)
    sheet = wb.sheet_by_name(self.sheet_name)
    return sheet
 
  def get_row_count(self):
    row_count = self.sheet.nrows
    return row_count
 
  def get_col_count(self):
    col_count = self.sheet.ncols
    return col_count
 
  def __get_cell_value(self,row_index, col_index):
    cell_value = self.sheet.cell_value(row_index,col_index)
    return cell_value
 
  def get_merged_info(self):
    merged_info = self.sheet.merged_cells
    return merged_info
 
  def get_merged_cell_value(self,row_index, col_index):
    """既能获取普通单元格的数据又能获取合并单元格数据"""
    cell_value = None
    for (rlow, rhigh, clow, chigh) in self.get_merged_info():
      if (row_index >= rlow and row_index < rhigh):
        if (col_index >= clow and col_index < chigh):
          cell_value = self.__get_cell_value(rlow, clow)
          break; # 防止循环去进行判断出现值覆盖的情况
        else:
          cell_value = self.__get_cell_value(row_index, col_index)
      else:
        cell_value = self.__get_cell_value(row_index, col_index)
    return cell_value
 
  def get_sheet_data_by_dict(self):
    all_data_list = []
    first_row = self.sheet.row(0) #获取首行数据
    for row in range(1, self.get_row_count()):
      row_dict = {}
      for col in range(0, self.get_col_count()):
        row_dict[first_row[col].value] = self.get_merged_cell_value(row, col)
      all_data_list.append(row_dict)
    return all_data_list
 
if __name__=='__main__':
  current_path = os.path.dirname(__file__)
  excel_path = os.path.join( current_path,'..','samples/data/test_case.xlsx' )
  excelUtils = ExcelUtils(excel_path,"Sheet1")
  for row in excelUtils.get_sheet_data_by_dict():
    print( row )
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import os
from common.excel_utils import ExcelUtils
from common.config_utils import config
 
current_path = os.path.dirname(__file__)
test_data_path = os.path.join( current_path,'..', config.CASE_DATA_PATH )
 
class TestdataUtils():
  def __init__(self,test_data_path = test_data_path):
    self.test_data_path = test_data_path
    self.test_data = ExcelUtils(test_data_path,"Sheet1").get_sheet_data_by_dict()
    self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info()
 
 
  def __get_testcase_data_dict(self):
    testcase_dict = {}
    for row_data in self.test_data:
      testcase_dict.setdefault( row_data['测试用例编号'],[] ).append( row_data )
    return testcase_dict
 
  def def_testcase_data_list(self):
    testcase_list = []
    for k,v in self.__get_testcase_data_dict().items():
      one_case_dict = {}
      one_case_dict["case_id"] = k
      one_case_dict["case_info"] = v
      testcase_list.append( one_case_dict )
    return testcase_list
 
 
if __name__=="__main__":
  testdataUtils = TestdataUtils()
  for i in testdataUtils.def_testcase_data_list():
    print( i )

testdata_utils.py 读取Excel中的数据后处理成需要的数据

四、request封装(核心封装)

requests_utils.py 包含post请求,get请求,异常,调用断言

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import ast
import re
import requests
import jsonpath
from requests.exceptions import RequestException
from requests.exceptions import ProxyError
from requests.exceptions import ConnectionError
from common.config_utils import config
from common.check_utils import CheckUtils
 
class RequestsUtils():
  def __init__(self):
    self.hosts = config.hosts
    self.headers = {"ContentType":"application/json;charset=utf-8"}
    self.session = requests.session()
    self.temp_variables = {}
 
  def __get(self,get_info):
    try:
      url = self.hosts + get_info["请求地址"]
      response = self.session.get( url = url,
                     params = ast.literal_eval(get_info["请求参数(get)"])
                     )
      response.encoding = response.apparent_encoding
      if get_info["取值方式"] == "json取值":
        value = jsonpath.jsonpath( response.json(),get_info["取值代码"] )[0]
        self.temp_variables[ get_info["传值变量"] ] = value
      elif get_info["取值方式"] == "正则取值":
        value = re.findall(get_info["取值代码"],response.text)[0]
        self.temp_variables[get_info["传值变量"]] = value
      result = CheckUtils(response).run_check(get_info['期望结果类型'], get_info['期望结果'])
    except ProxyError as e:
      result = {'code': 4, 'result': '[%s]请求:代理错误异常' % (get_info["接口名称"])}
    except ConnectionError as e:
      result = {'code': 4, 'result': '[%s]请求:连接超时异常' % (get_info["接口名称"])}
    except RequestException as e:
      result = {'code': 4, 'result': '[%s]请求:Request异常,原因:%s' % (get_info["接口名称"], e.__str__())}
    except Exception as e:
      result = {'code':4,'result':'[%s]请求:系统异常,原因:%s'%(get_info["接口名称"],e.__str__())}
    return result
 
  def __post(self,post_info):
    try:
      url = self.hosts + post_info["请求地址"]
      response = self.session.post( url = url,
                     headers = self.headers,
                     params = ast.literal_eval(post_info["请求参数(get)"]),
                    # data = post_infos["提交数据(post)"],
                     json=ast.literal_eval(post_info["提交数据(post)"])
                    )
      response.encoding = response.apparent_encoding
      if post_info["取值方式"] == "json取值":
        value = jsonpath.jsonpath( response.json(),post_info["取值代码"] )[0]
        self.temp_variables[ post_info["传值变量"] ] = value
      elif post_info["取值方式"] == "正则取值":
        value = re.findall(post_info["取值代码"],response.text)[0]
        self.temp_variables[post_info["传值变量"]] = value
      #调用CheckUtils()
      result = CheckUtils(response).run_check(post_info['期望结果类型'],post_info['期望结果'])
    except ProxyError as e:
      result = {'code': 4, 'result': '[%s]请求:代理错误异常' % (post_info["接口名称"])}
    except ConnectionError as e:
      result = {'code': 4, 'result': '[%s]请求:连接超时异常' % (post_info["接口名称"])}
    except RequestException as e:
      result = {'code': 4, 'result': '[%s]请求:Request异常,原因:%s' % (post_info["接口名称"], e.__str__())}
    except Exception as e:
      result = {'code':4,'result':'[%s]请求:系统异常,原因:%s'%(post_info["接口名称"],e.__str__())}
    return result
 
  def request(self,step_info):
    try:
      request_type = step_info["请求方式"]
      param_variable_list = re.findall('\\${\w+}', step_info["请求参数(get)"])
      if param_variable_list:
        for param_variable in param_variable_list:
          step_info["请求参数(get)"] = step_info["请求参数(get)"]\
            .replace(param_variable,'"%s"' % self.temp_variables.get(param_variable[2:-1]))
      if request_type == "get":
        result = self.__get( step_info )
      elif request_type == "post":
        data_variable_list = re.findall('\\${\w+}', step_info["提交数据(post)"])
        if data_variable_list:
          for param_variable in data_variable_list:
            step_info["提交数据(post)"] = step_info["提交数据(post)"] \
              .replace(param_variable, '"%s"' % self.temp_variables.get(param_variable[2:-1]))
        result = self.__post( step_info )
      else:
        result = {'code':1,'result':'请求方式不支持'}
    except Exception as e:
      result = {'code':4,'result':'用例编号[%s]的[%s]步骤出现系统异常,原因:%s'%(step_info['测试用例编号'],step_info["测试用例步骤"],e.__str__())}
    return result
 
  def request_by_step(self,step_infos):
    self.temp_variables = {}
    for step_info in step_infos:
      temp_result = self.request( step_info )
      # print( temp_result )
      if temp_result['code']!=0:
        break
    return temp_result
 
 
if __name__=="__main__":
 
  case_info = [
    {'请求方式': 'get', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}', '提交数据(post)': '', '取值方式': 'json取值', '传值变量': 'token', '取值代码': '$.access_token', '期望结果类型': '正则匹配', '期望结果': '{"access_token":"(.+?)","expires_in":(.+?)}'},
    {'请求方式': 'post', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":${token}}', '提交数据(post)': '{"tag" : {"name" : "衡东"}}','取值方式': '无', '传值变量': '', '取值代码': '', '期望结果类型': '正则匹配', '期望结果': '{"tag":{"id":(.+?),"name":"衡东"}}'}
  ]
  RequestsUtils().request_by_step(case_info)

五、断言封装(核心封装)

check_utils.py 断言封装,与实际结果核对

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import re
import ast
 
class CheckUtils():
  def __init__(self,check_response=None):
    self.ck_response=check_response
    self.ck_rules = {
      '无': self.no_check,
      'json键是否存在': self.check_key,
      'json键值对': self.check_keyvalue,
      '正则匹配': self.check_regexp
    }
    self.pass_result = {
      'code': 0,
      'response_reason': self.ck_response.reason,
      'response_code': self.ck_response.status_code,
      'response_headers': self.ck_response.headers,
      'response_body': self.ck_response.text,
      'check_result': True,
      'message': '' # 扩招作为日志输出等
    }
    self.fail_result = {
      'code': 2,
      'response_reason': self.ck_response.reason,
      'response_code': self.ck_response.status_code,
      'response_headers': self.ck_response.headers,
      'response_body': self.ck_response.text,
      'check_result': False,
      'message': '' # 扩招作为日志输出等
    }
 
 
  def no_check(self):
    return self.pass_result
 
 
  def check_key(self,check_data=None):
    check_data_list = check_data.split(','#把需要判断的值做切割,取出键值
    res_list = [] #存放每次比较的结果
    wrong_key = [] #存放比较失败key
    for check_data in check_data_list:  #把切割的键值和取出响应结果中的所有的键一个一个对比
      if check_data in self.ck_response.json().keys():
        res_list.append(self.pass_result )
      else:
        res_list.append( self.fail_result )
        wrong_key.append(check_data)   #把失败的键放进来,便于后续日志输出
    # print(res_list)
    # print(wrong_key)
    if self.fail_result in res_list:
      return self.fail_result
    else:
      return self.pass_result
 
  def check_keyvalue(self,check_data=None):
    res_list = [] # 存放每次比较的结果
    wrong_items = [] # 存放比较失败 items
    for check_item in ast.literal_eval(check_data).items(): #literal_eval()安全性的把字符串转成字典,items()取出键值对
      if check_item in self.ck_response.json().items():
        res_list.append( self.pass_result )
      else:
        res_list.append( self.fail_result )
        wrong_items.append(check_item)
    # print( res_list )
    # print( wrong_items )
 
    if self.fail_result in res_list:
      return self.fail_result
    else:
      return self.pass_result
 
  def check_regexp(self,check_data=None):
    pattern = re.compile(check_data)
    if re.findall(pattern=pattern,string=self.ck_response.text): #匹配到了,不为空,为true
      return self.pass_result
    else:
      return self.fail_result
 
  def run_check(self,check_type=None,check_data=None):
    code = self.ck_response.status_code
    if code == 200:
      if check_type in self.ck_rules.keys():
        result=self.ck_rules[check_type](check_data)
        return result
      else:
        self.fail_result['message'] = '不支持%s判断方法'%check_type
        return self.fail_result
    else:
      self.fail_result['message'] = '请求的响应状态码非%s'%str(code)
      return self.fail_result
 
 
 
 
if __name__=="__main__":
  # 检查键是否存在,{"access_token":"hello","expires_":7200} 设为响应结果,"access_token,expires_in" 为检查对象值
  CheckUtils({"access_token":"hello","expires_":7200}).check_key("access_token,expires_in")
  #检查键值对是否存在
  CheckUtils({"access_token":"hello","expires_i":7200}).check_keyvalue('{"expires_in": 7200}')
  #正则对比
  #TURE
  print(CheckUtils('{"access_token":"hello","expires_in":7200}').check_regexp('"expires_in":(.+?)'))
  #False
  print(CheckUtils('{"access_token":"hello","expires":7200}').check_regexp('"expires_in":(.+?)'))

六、api_testcase下的api_test.py 封装

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import warnings
import unittest
import paramunittest
from common.testdata_utils import TestdataUtils
from common.requests_utils import RequestsUtils
 
#如果是mysql数据源的话切换成 def_testcase_data_list_by_mysql()  exccel数据源:def_testcase_data_list()
 
case_infos = TestdataUtils().def_testcase_data_list_by_mysql()
 
@paramunittest.parametrized(
  *case_infos
)
 
class APITest(paramunittest.ParametrizedTestCase):
  def setUp(self) -> None:
    warnings.simplefilter('ignore', ResourceWarning) #不会弹出警告提示
 
  def setParameters(self, case_id, case_info):
    self.case_id = case_id
    self.case_info = case_info
 
  def test_api_common_function(self):
    '''测试描述'''
    self._testMethodName = self.case_info[0].get("测试用例编号")
    self._testMethodDoc = self.case_info[0].get("测试用例名称")
    actual_result = RequestsUtils().request_by_step(self.case_info)
    self.assertTrue( actual_result.get('check_result'),actual_result.get('message') )
 
if __name__ == '__main__':
  unittest.main()

七、common下的log_utils.py 封装

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
import logging
import time
from common.config_utils import config
 
current_path = os.path.dirname(__file__)
log_output_path = os.path.join( current_path,'..', config.LOG_PATH )
 
class LogUtils():
  def __init__(self,log_path=log_output_path):
    self.log_name = os.path.join( log_output_path ,'ApiTest_%s.log'%time.strftime('%Y_%m_%d') )
    self.logger = logging.getLogger("ApiTestLog")
    self.logger.setLevel( config.LOG_LEVEL )
 
    console_handler = logging.StreamHandler() # 控制台输出
    file_handler = logging.FileHandler(self.log_name,'a',encoding='utf-8') # 文件输出
    formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
 
    self.logger.addHandler( console_handler )
    self.logger.addHandler( file_handler )
 
    console_handler.close() # 防止打印日志重复
    file_handler.close()   # 防止打印日志重复
 
  def get_logger(self):
    return self.logger
 
logger = LogUtils().get_logger()  # 防止打印日志重复
 
if __name__ == '__main__':
  logger.info('hello')

八、common下的config_utils.py的封装

配置文件的编写:

Python reques接口测试框架实现代码

对配置文件的读取封装:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import configparser
 
current_path = os.path.dirname(__file__)
cfgpath = os.path.join(current_path, "../conf/local_config.ini")
print(cfgpath)
 
 
class ConfigUtils:
  def __init__(self,config_path=cfgpath):
    self.__conf=configparser.ConfigParser()
    self.__conf.read(config_path, encoding="utf-8")
 
  def read_ini(self,sec,option):
    value=self.__conf.get(sec,option)
    return value
 
  @property
  def hosts(self):
    value=self.read_ini('default','hosts')
    return value
 
  @property
  def LOG_PATH(self):
    value = self.read_ini('path', 'LOG_PATH')
    return value
 
  @property
  def CASE_DATA_PATH(self):
    value = self.read_ini('path', 'CASE_DATA_PATH')
    return value
 
  @property
  def REPORT_PATH(self):
    value = self.read_ini('path', 'REPORT_PATH')
    return value
 
  @property
  def LOG_LEVEL(self):
    value = int(self.read_ini('log', 'LOG_LEVEL'))
    return value
 
 
  @property
  def smtp_server(self):
    smtp_server_value = self.read_ini('email', 'smtp_server')
    return smtp_server_value
 
  @property
  def smtp_sender(self):
    smtp_sender_value = self.read_ini('email', 'smtp_sender')
    return smtp_sender_value
 
  @property
  def smtp_password(self):
    smtp_password_value = self.read_ini('email', 'smtp_password')
    return smtp_password_value
 
  @property
  def smtp_receiver(self):
    smtp_receiver_value = self.read_ini('email', 'smtp_receiver')
    return smtp_receiver_value
 
  @property
  def smtp_cc(self):
    smtp_cc_value = self.read_ini('email', 'smtp_cc')
    return smtp_cc_value
 
  @property
  def smtp_subject(self):
    smtp_subject_value = self.read_ini('email', 'smtp_subject')
    return smtp_subject_value
config=ConfigUtils()
 
if __name__=='__main__':
  current_path = os.path.dirname(__file__)
  cfgpath = os.path.join(current_path, "../conf/local_config.ini")
  config_u=ConfigUtils()
  print(config_u.hosts)
  print(config_u.LOG_LEVEL)

九、test_runner下的run_case.py 封装

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class RunCase():
  def __init__(self):
    self.test_case_path = test_case_path
    self.report_path = test_report_path
    self.title = 'P1P2接口自动化测试报告'
    self.description = '自动化接口测试框架'
    self.tester = '测试开发组'
  def load_test_suite(self):
    discover = unittest.defaultTestLoader.discover(start_dir=self.test_case_path,
                            pattern='api_test.py',
                            top_level_dir=self.test_case_path)
    all_suite = unittest.TestSuite()
    all_suite.addTest( discover )
    return all_suite
  def run(self):
    report_dir = HTMLTestReportCN.ReportDirectory(self.report_path)
    report_dir.create_dir(self.title)
    report_file_path = HTMLTestReportCN.GlobalMsg.get_value('report_path')
    fp = open( report_file_path ,'wb' )
    runner = HTMLTestReportCN.HTMLTestRunner(stream=fp,
                         title=self.title,
                         description=self.description,
                         tester=self.tester)
    runner.run( self.load_test_suite() )
    fp.close()
    return report_file_path
 
 
if __name__=='__main__':
  report_path = RunCase().run()
  EmailUtils(open(report_path, 'rb').read(), report_path).send_mail()

十、common下的email_utils.py 封装

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from common.config_utils import config
 
class EmailUtils():
  def __init__(self,smtp_body,smtp_attch_path=None):
    self.smtp_server = config.smtp_server
    self.smtp_sender = config.smtp_sender
    self.smtp_password = config.smtp_password
    self.smtp_receiver = config.smtp_receiver
    self.smtp_cc = config.smtp_cc
    self.smtp_subject = config.smtp_subject
    self.smtp_body = smtp_body
    self.smtp_attch = smtp_attch_path
 
  def mail_message_body(self):
    message = MIMEMultipart()
    message['from'] = self.smtp_sender
    message['to'] = self.smtp_receiver
    message['Cc'] = self.smtp_cc
    message['subject'] = self.smtp_subject
    message.attach( MIMEText(self.smtp_body,'html','utf-8') )
    if self.smtp_attch:
      attach_file = MIMEText(open(self.smtp_attch, 'rb').read(), 'base64', 'utf-8')
      attach_file['Content-Type'] = 'application/octet-stream'
      attach_file.add_header('Content-Disposition', 'attachment', filename=('gbk', '', os.path.basename(self.smtp_attch)))
      message.attach(attach_file)
    return message
 
  def send_mail(self):
    smtp = smtplib.SMTP()
    smtp.connect(self.smtp_server)
    smtp.login(user=self.smtp_sender, password=self.smtp_password)
    smtp.sendmail(self.smtp_sender,self.smtp_receiver.split(",")+ self.smtp_cc.split(","), self.mail_message_body().as_string())
 
if __name__=='__main__':
  html_path = os.path.dirname(__file__) + '/../test_reports/接口自动化测试报告V1.1/接口自动化测试报告V1.1.html'
  EmailUtils('<h3 align="center">自动化测试报告</h3>',html_path).send_mail()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://www.cnblogs.com/123anqier-blog/p/13376455.html

延伸 · 阅读

精彩推荐