python对于requests的封装方法详解
作者:可昌 发布时间:2023-02-06 15:29:48
标签:python,requests,封装
由于requests是http类接口的核心,因此封装 * 虑问题比较多:
1. 对多种接口类型的支持;
2. 连接异常时能够重连;
3. 并发处理的选择;
4. 使用方便,容易维护;
当前并未全部实现,后期会不断完善。重点提一下并发处理的选择:python的并发处理机制由于存在GIL的原因,实现起来并不是很理想,综合考虑多进程、多线程、协程,在不考虑大并发性能测试的前提下使用了多线程-线程池的形式实现。使用的是
concurrent.futures模块。当前仅方便支持webservice接口。
# -*- coding:utf-8 -*-
import requests
from concurrent.futures import ThreadPoolExecutor
from Tools.Config import Config # 配置文件读取
from Tools.Log import Log # 日志管理
from Tools.tools import decoLOG # 日志装饰
'''
功能: Requests类
使用方法:
作者: 郭可昌
作成时间: 20180224
更新内容:
更新时间:
'''
class Requests(object):
def __init__(self):
self.session = requests.session()
self.header = {}
# URL默认来源于配置文件,方便不同测试环境的切换,也可以动态设定
self.URL = Config().getURL()
# 默认60s,可以动态设定
self.timeout = 60
#http连接异常的场合,重新连接的次数,默认为3,可以动态设定
self.iRetryNum = 3
self.errorMsg = ""
# 内容 = {用例编号:响应数据}
self.responses = {}
# 内容 = {用例编号:异常信息}
self.resErr={}
# 原始post使用保留
# bodyData: request's data
@decoLOG
def post(self, bodyData):
response = None
self.errorMsg = ""
try:
response = self.session.post(self.URL, data=bodyData.encode('utf-8'), headers=self.header, timeout=self.timeout)
response.raise_for_status()
except Exception as e:
self.errorMsg = str(e)
Log().logger.error("HTTP请求异常,异常信息:%s" % self.errorMsg)
return response
# 复数请求并发处理,采用线程池的形式,用例数>线程池的容量:线程池的容量为并发数,否则,用例数为并发数
# dicDatas: {用例编号:用例数据}
@decoLOG
def req_all(self, dicDatas, iThreadNum=5):
if len(dict(dicDatas)) < 1:
Log().logger.error("没有测试对象,请确认后再尝试。。。")
return self.responses.clear()
# 请求用例集合转换(用例编号,用例数据)
seed = [i for i in dicDatas.items()]
self.responses.clear()
# 线程池并发执行,iThreadNum为并发数
with ThreadPoolExecutor(iThreadNum) as executor:
executor.map(self.req_single,seed)
# 返回所有请求的响应信息({用例编号:响应数据}),http连接异常:对应None
return self.responses
# 用于单用例提交,http连接失败可以重新连接,最大重新连接数可以动态设定
def req_single(self, listData, reqType="post", iLoop=1):
response = None
# 如果达到最大重连次数,连接后提交结束
if iLoop == self.iRetryNum:
if reqType == "post":
try:
response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
timeout=self.timeout)
response.raise_for_status()
except Exception as e:
# 异常信息保存只在最大连接次数时进行,未达到最大连接次数,异常信息为空
self.resErr[listData[0]] = str(e)
Log().logger.error("HTTP请求异常,异常信息:%s【%d】" % (str(e), iLoop))
self.responses[listData[0]] = response
else:
# for future: other request method expand
pass
# 未达到最大连接数,如果出现异常,则重新连接尝试
else:
if reqType == "post":
try:
response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
timeout=self.timeout)
response.raise_for_status()
except Exception as e:
Log().logger.error("HTTP请求异常,异常信息:%s【%d】" % (str(e), iLoop))
# 重连次数递增
iLoop += 1
# 进行重新连接
self.req_single(listData, reqType, iLoop)
# 当前连接终止
return None
self.responses[listData[0]] = response
else:
# for future: other request method expand
pass
# 设定SoapAction, 快捷完成webservice接口header设定
def setSoapAction(self, soapAction):
self.header["SOAPAction"] = soapAction
self.header["Content-Type"] = "text/xml;charset=UTF-8"
self.header["Connection"] = "Keep-Alive"
self.header["User-Agent"] = "InterfaceAutoTest-run"
来源:https://blog.csdn.net/fatalxx/article/details/79384056
0
投稿
猜你喜欢
- 1.什么是Pillow首先我们需要了解一下PIL(Python Imaging Library),它是Python2中非常强大的图像处理标准
- 利用Python处理数据时,处理完成后输出结果为二维的列表,如果我们想把这个列表输出到Excel中形成格式化的数据,其实和输出到TXT文件大
- 问题描述给出一个整数数组 nums,请返回其中位数为偶数的数字的个数。示例 1:输入:nums = [12,345,2,6,7896]输出:
- 前言需求是将两个list同时进行遍历,然后同步的将每个元素add到一个dict中,虽然有麻烦的方式,比如直接用list的数组下标可以实现,但
- 以Python 3.x版本为主一、条件语句条件语句基本结构0或null为false,其余则为trueif 判定条件:语句块...可多行els
- 我们经常会在登录一个网站的时候被引导页挡住前进的脚步,这一点在上个世纪到本世纪初的网站中尤其明显,特别是在企业网站里,几乎每个企业网站都会有
- 一、什么是集合二、集合的创建方式集合中的元素不能重复#地点:湖北武汉#姓名:学工科的皮皮志#开发时间:2022/2/27 19:
- 没事在这里发一下关于数据库大批量插入数据的效率对比,用ACCESS和MSSQL,数值是在本机测试,根据不同的环境和配置,数值可能会有较大差别
- functools模块提供了一些常用的高阶函数(处理其他可调用对象/函数的特殊函数;以函数作为输入参数,返回也是函数)。functools模
- 大家可能都不大熟悉关于pdb这个模块,实际上就是python中的内置模块,主要作用于命令行调试代码,下面我们将通过是哪个小结给大家详细介绍下
- 1. 线性表简介线性表是一种线性结构,它是由零个或多个数据元素构成的有限序列。线性表的特征是在一个序列中,除了头尾元素,每个元素都有且只有一
- python 爬虫解决403禁止访问错误在Python写爬虫的时候,html.getcode()会遇到403禁止访问的问题,这是网站对自动化
- 首先,在数据库中创建一个表,用于存放图片:CREATE TABLE Images(Id INT PRIMARY KEY AUTO_INCRE
- 主程序mainaddfunc.pyfrom flask import Flask, render_template, request, ur
- 简介在很多实际的项目开发中,我们需要实现很多实时功能;而在这篇文章中,我们就利用django channels简单地实现了点对点聊天和消息推
- 重载:同一个类中,函数名一样,返回值或者参数类型,个数不一样的叫做重载。 覆盖:同名函数,同返回值类型,同参数的叫做覆盖。指的是子类对父类中
- 高级语言不能直接被机器所理解执行,所以都需要一个翻译的阶段,解释型语言用到的是解释器,编译型语言用到的是编译器。编译型语言通常的执行过程是:
- 1. lock互斥锁知识点:lock.acquire()# 上锁lock.release()# 解锁#同一时间允许一个进程上一把锁 就是Lo
- 在实际的工作和学习中,许多人的SQL Server 2005数据库日志文件可能会发生损坏,例如硬件故障、计算机非正常重启或关机等等。在SQL
- 网站上的Banner条,是网站用来作为盈利或者是发布一些重要的信息的工具。但是它又不能作为网页的主要内容,因为它的主要目的是吸引人的注意力,