Skip to content

【实战练习】接口自动化测试

【实战练习】接口自动化测试

实战需求

点击查看实战详情

实现思路

uml diagram

代码实现

Python 版本

wework.py 文件

添加多环境切换的配置。

def get_config_env(self):
    '''
    多环境切换
    :return:
    '''
    env = os.getenv('env', default='test')
    file_path = f"{Utils.get_root_path()}/config/{env}.yaml"
    ...

department.py 文件

添加数据清理。

def clear(self):
    '''
    清理除去父部门之外,已经存在的部门
    :return:
    '''
    # 发出查询子部门 id 接口请求
    r = self.get_ids()
    # 获取所有部门的 id
    ids = jsonpath(r.json(), "$..id")
    for i in ids:
        if i != 1:
            # 调用删除部门接口
            self.delete(i)

member.py 文件

封装成员管理查询接口封装。

class Member(WeWork):
    def __init__(self):
        super().__init__()
        self.corpsecret = self.data.get("corpsecret").get("contacts")
        self.token = self.get_token(self.corpsecret)
    def get_user(self):
        # https://qyapi.weixin.qq.com/cgi-bin/user/list_id?access_token=ACCESS_TOKEN
        req = {
            "method": "POST",
            "url": f"{self.base_url}/cgi-bin/user/list_id?access_token={self.token}",
        }
        r = self.send(req)
        return r

test_member.py 文件

测试查询成员 ID 信息接口,使用复杂断言优化。

@allure.feature("成员管理")
class TestMember:
    def setup_class(self):
        self.member = Member()
        # 获取成员id结果
        self.r = self.member.get_user()
        # 接口响应示例
        self.user_ids_data = {...}

    @allure.story("使用 JSONPATH 断言查询结果")
    def test_get_member_by_jsonpath(self):
        # 使用 jsonpath 获取 ids
        ids = jsonpath(self.r.json(), "$..userid")
        assert "xing01" in ids
        assert len(ids) == 5

    @allure.story("使用 JSONSchema 断言查询结果")
    def test_get_member_by_jsonschema(self):
        '''
        jsonschema 简单验证
        :return:
        '''
        user_ids_jsonschema = Utils.generate_jsonschema(self.user_ids_data)
        # print("jsonschema的结构为:", user_ids_jsonschema)
        logger.info(f"JSONSchema 的结构为:{user_ids_jsonschema}")
        res = Utils.validate_schema(self.r.json(), user_ids_jsonschema)
        assert res

    @allure.story("使用 JSONSchema 文件断言查询结果")
    def test_member_by_jsonchema_file(self):
        '''
        jsonschema 文件验证
        :return:
        '''
        # 输出到data目录中
        file_path = f"{Utils.get_root_path()}/data/user_info.json"
        Utils.generate_jsonschema_by_file(self.user_ids_data, file_path)
        res = Utils.validate_schema_by_file(self.r.json(), file_path)
        assert res

Utils.py 文件

工具类增加 JSONSchema 的配置和数据库的配置。

@classmethod
def generate_jsonschema(cls, obj):
    '''
    生成 jsonschema 的数据
    :param obj:
    :return:
    '''
    # pip install genson
    builder = SchemaBuilder()
    builder.add_object(obj)
    return builder.to_schema()

@classmethod
def validate_schema(cls, data_obj, schema):
    '''
    通过 schema 验证数据
    :return:
    '''
    try:
        validate(data_obj, schema=schema)
        return True
    except Exception as e:
        print(f"结构体验证失败,失败原因是{e}")
        return False

@classmethod
def generate_jsonschema_by_file(cls, obj, file_path):
    '''
    生成 schema 数据保存到文件中
    :return:
    '''
    jsonschema_data = cls.generate_jsonschema(obj)
    with open(file_path, "w") as f:
        json.dump(jsonschema_data, f)

@classmethod
def validate_schema_by_file(cls, data_obj, schema_file):
    '''
    通过文件验证数据
    :param data_obj:
    :param schema:
    :return:
    '''
    with open(schema_file) as f:
        schema_data = json.load(f)
    return cls.validate_schema(data_obj, schema_data)

@classmethod
def get_conn(cls):
    conn = pymysql.connect(
        host="127.0.0.1",
        port=3306,
        user="root",
        password="123456",
        database="test_wework")
    return conn

# 执行 sql 语句
@classmethod
def execute_sql(cls, query):
    # 获取数据库连接对象 使用 with 语句块可以确保执行完毕后正确关闭数据库
    with cls.get_conn() as connect:
        try:
            # 创建游标对象
            with connect.cursor() as cursor:
                # 执行 sql 语句
                cursor.execute(query)
                # 获取所有记录
                record = cursor.fetchall()
                return record
        except pymysql.Error as e:
            print(f"SQL 执行错误信息为:{e}")
            return None

wework.py 文件

获取 token 步骤优化:

def get_token_by_file(self, key):
    '''
    从文件读取配置
    :param key: 应用名
    :return:
    '''
    # 获取 secret
    secret = self.data.get('corpsecret').get(key)
    # 拿到文件路径
    file_path = f'{self.path}/data/token.yaml'
    # 获取 token info 的信息  time_stamp 和 access_token
    try:
        token_data = Utils.get_yaml_data(file_path).get(key, {})
        time_stamp = token_data.get('time_stamp')
        access_token = token_data.get('access_token')
    except Exception as e:
        logger.info(f"获取数据出错,错误原因为:{e}")
        return e
    # 获取时间差
    time_step = int(time.time()) - time_stamp
    # 判断 token 是否存在 以及时间戳是否过期
    if access_token is None or time_step >= 7200:
        new_token = self.get_token(secret)
        # 写入新数据
        token_data.update({"time_stamp": int(time.time()), "access_token": new_token})
        Utils.add_yaml_data(token_data, file_path)
        # 返回新的token
        return new_token
    else:
        # 返回已有token
        return access_token

Java 版本

成员添加测试用例

@Slf4j
public class WeworkMember extends Wework{
    String SECRET;

    public WeworkMember() {
        // 读取配置
        super();
        SECRET = config.getContactsSecret();
    }

    /**
     * 获取成员id列表
     * @return
     */
    public List<String> getMemberIds() {
        String text = given()
                .queryParams("access_token", getToken(SECRET))
                .when()
                .post("/user/list_id")
                .then()
                .extract().response().getBody().asString();
        return JsonPath.read(text, "$..userid");
    }

}

    @BeforeAll
    static void setUp() throws IOException {
        weworkMember = new WeworkMember();
    }
    @Test
    void getMemberIdsTest(){
        assertThat(weworkMember.getMemberIds(),hasItem("ChenGengPeng"));
    }

多环境切换

public class ConfigUtil {

    /**
     * 获取项目配置文件
     *
     * @return 配置实体类
     */
    public static ConfigEntity getConfig() {
        try {
            ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
            HashMap<String,Object> configmap = mapper.readValue(ConfigEntity.class.getClassLoader().getResourceAsStream("config.yaml"), new TypeReference<HashMap<String,Object>>(){});
            String filePath = "config-" + configmap.get("env") + ".yaml";
            return mapper.readValue(ConfigEntity.class.getClassLoader().getResourceAsStream(filePath), ConfigEntity.class);
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
}
- config.yaml
env: test
- config-test.yaml
baseUrl: "https://qyapi.weixin.qq.com/cgi-bin"
corpId: "ww7f7d94d8b17ac268"
contactsSecret: "yFxjxy0uHfAl9QIjjAgwTiMTdjq16z-K9eSRfNZvqV4"
#proxy:
#  host: "localhost"
#  port: 8888


缓存token关键代码

@Override
public Response filter(FilterableRequestSpecification requestSpec, FilterableResponseSpecification responseSpec, FilterContext filterContext) {
    // 从缓存中获取token
    ObjectMapper mapper = new ObjectMapper();
    String secret = requestSpec.getRequestParams().get(SECRET_PARAM_NAME);
    TokenEntity tokenEntity = tokenCache.load(secret);
    // 如果缓存存在则直接返回token
    if (tokenEntity != null) {
        try {
            return new ResponseBuilder()
                    .setStatusCode(200)
                    .setContentType("application/json")
                    .setBody(mapper.writeValueAsString(tokenEntity))
                    .build();
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
    Response response = filterContext.next(requestSpec, responseSpec);
    TokenEntity te = response.body().as(TokenEntity.class);
    // 写入缓存
    tokenCache.dump(secret,te,te.getExpiresIn());
    return response;
}