【实战练习】接口自动化测试
【实战练习】接口自动化测试
实战需求
实现思路
代码实现
Python 版本
wework.py 文件
添加多环境切换的配置。
def get_config_env(self):
'''
多环境切换
:return:
'''
env = os.getenv('env', default='test')
file_path = f"{Utils.get_root_path()}/config/{env}.yaml"
...
department.py 文件
添加数据清理。
def clear(self):
'''
清理除去父部门之外,已经存在的部门
:return:
'''
# 发出查询子部门 id 接口请求
r = self.get_ids()
# 获取所有部门的 id
ids = jsonpath(r.json(), "$..id")
for i in ids:
if i != 1:
# 调用删除部门接口
self.delete(i)
member.py 文件
封装成员管理查询接口封装。
class Member(WeWork):
def __init__(self):
super().__init__()
self.corpsecret = self.data.get("corpsecret").get("contacts")
self.token = self.get_token(self.corpsecret)
def get_user(self):
# https://qyapi.weixin.qq.com/cgi-bin/user/list_id?access_token=ACCESS_TOKEN
req = {
"method": "POST",
"url": f"{self.base_url}/cgi-bin/user/list_id?access_token={self.token}",
}
r = self.send(req)
return r
test_member.py 文件
测试查询成员 ID 信息接口,使用复杂断言优化。
@allure.feature("成员管理")
class TestMember:
def setup_class(self):
self.member = Member()
# 获取成员id结果
self.r = self.member.get_user()
# 接口响应示例
self.user_ids_data = {...}
@allure.story("使用 JSONPATH 断言查询结果")
def test_get_member_by_jsonpath(self):
# 使用 jsonpath 获取 ids
ids = jsonpath(self.r.json(), "$..userid")
assert "xing01" in ids
assert len(ids) == 5
@allure.story("使用 JSONSchema 断言查询结果")
def test_get_member_by_jsonschema(self):
'''
jsonschema 简单验证
:return:
'''
user_ids_jsonschema = Utils.generate_jsonschema(self.user_ids_data)
# print("jsonschema的结构为:", user_ids_jsonschema)
logger.info(f"JSONSchema 的结构为:{user_ids_jsonschema}")
res = Utils.validate_schema(self.r.json(), user_ids_jsonschema)
assert res
@allure.story("使用 JSONSchema 文件断言查询结果")
def test_member_by_jsonchema_file(self):
'''
jsonschema 文件验证
:return:
'''
# 输出到data目录中
file_path = f"{Utils.get_root_path()}/data/user_info.json"
Utils.generate_jsonschema_by_file(self.user_ids_data, file_path)
res = Utils.validate_schema_by_file(self.r.json(), file_path)
assert res
Utils.py 文件
工具类增加 JSONSchema 的配置和数据库的配置。
@classmethod
def generate_jsonschema(cls, obj):
'''
生成 jsonschema 的数据
:param obj:
:return:
'''
# pip install genson
builder = SchemaBuilder()
builder.add_object(obj)
return builder.to_schema()
@classmethod
def validate_schema(cls, data_obj, schema):
'''
通过 schema 验证数据
:return:
'''
try:
validate(data_obj, schema=schema)
return True
except Exception as e:
print(f"结构体验证失败,失败原因是{e}")
return False
@classmethod
def generate_jsonschema_by_file(cls, obj, file_path):
'''
生成 schema 数据保存到文件中
:return:
'''
jsonschema_data = cls.generate_jsonschema(obj)
with open(file_path, "w") as f:
json.dump(jsonschema_data, f)
@classmethod
def validate_schema_by_file(cls, data_obj, schema_file):
'''
通过文件验证数据
:param data_obj:
:param schema:
:return:
'''
with open(schema_file) as f:
schema_data = json.load(f)
return cls.validate_schema(data_obj, schema_data)
@classmethod
def get_conn(cls):
conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="test_wework")
return conn
# 执行 sql 语句
@classmethod
def execute_sql(cls, query):
# 获取数据库连接对象 使用 with 语句块可以确保执行完毕后正确关闭数据库
with cls.get_conn() as connect:
try:
# 创建游标对象
with connect.cursor() as cursor:
# 执行 sql 语句
cursor.execute(query)
# 获取所有记录
record = cursor.fetchall()
return record
except pymysql.Error as e:
print(f"SQL 执行错误信息为:{e}")
return None
wework.py 文件
获取 token 步骤优化:
def get_token_by_file(self, key):
'''
从文件读取配置
:param key: 应用名
:return:
'''
# 获取 secret
secret = self.data.get('corpsecret').get(key)
# 拿到文件路径
file_path = f'{self.path}/data/token.yaml'
# 获取 token info 的信息 time_stamp 和 access_token
try:
token_data = Utils.get_yaml_data(file_path).get(key, {})
time_stamp = token_data.get('time_stamp')
access_token = token_data.get('access_token')
except Exception as e:
logger.info(f"获取数据出错,错误原因为:{e}")
return e
# 获取时间差
time_step = int(time.time()) - time_stamp
# 判断 token 是否存在 以及时间戳是否过期
if access_token is None or time_step >= 7200:
new_token = self.get_token(secret)
# 写入新数据
token_data.update({"time_stamp": int(time.time()), "access_token": new_token})
Utils.add_yaml_data(token_data, file_path)
# 返回新的token
return new_token
else:
# 返回已有token
return access_token
Java 版本
成员添加测试用例
@Slf4j
public class WeworkMember extends Wework{
String SECRET;
public WeworkMember() {
// 读取配置
super();
SECRET = config.getContactsSecret();
}
/**
* 获取成员id列表
* @return
*/
public List<String> getMemberIds() {
String text = given()
.queryParams("access_token", getToken(SECRET))
.when()
.post("/user/list_id")
.then()
.extract().response().getBody().asString();
return JsonPath.read(text, "$..userid");
}
}
@BeforeAll
static void setUp() throws IOException {
weworkMember = new WeworkMember();
}
@Test
void getMemberIdsTest(){
assertThat(weworkMember.getMemberIds(),hasItem("ChenGengPeng"));
}
多环境切换
public class ConfigUtil {
/**
* 获取项目配置文件
*
* @return 配置实体类
*/
public static ConfigEntity getConfig() {
try {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
HashMap<String,Object> configmap = mapper.readValue(ConfigEntity.class.getClassLoader().getResourceAsStream("config.yaml"), new TypeReference<HashMap<String,Object>>(){});
String filePath = "config-" + configmap.get("env") + ".yaml";
return mapper.readValue(ConfigEntity.class.getClassLoader().getResourceAsStream(filePath), ConfigEntity.class);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
env: test
baseUrl: "https://qyapi.weixin.qq.com/cgi-bin"
corpId: "ww7f7d94d8b17ac268"
contactsSecret: "yFxjxy0uHfAl9QIjjAgwTiMTdjq16z-K9eSRfNZvqV4"
#proxy:
# host: "localhost"
# port: 8888
缓存token关键代码
@Override
public Response filter(FilterableRequestSpecification requestSpec, FilterableResponseSpecification responseSpec, FilterContext filterContext) {
// 从缓存中获取token
ObjectMapper mapper = new ObjectMapper();
String secret = requestSpec.getRequestParams().get(SECRET_PARAM_NAME);
TokenEntity tokenEntity = tokenCache.load(secret);
// 如果缓存存在则直接返回token
if (tokenEntity != null) {
try {
return new ResponseBuilder()
.setStatusCode(200)
.setContentType("application/json")
.setBody(mapper.writeValueAsString(tokenEntity))
.build();
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
Response response = filterContext.next(requestSpec, responseSpec);
TokenEntity te = response.body().as(TokenEntity.class);
// 写入缓存
tokenCache.dump(secret,te,te.getExpiresIn());
return response;
}