公司网站后台更新,北京网站建设公司册,网站的管理维护,有哪些免费做简历的网站一、多线程使用
1.使用threading库
# 使用threading进行调用多线程
from threading import Thread 2.使用
(1)使用函数方法
thread Thread(target方法对象, #不要括号args(参数1, 参数2) #一个参数也需要加上逗号
)
# 创建一个线程对象
thread.start(…一、多线程使用
1.使用threading库
# 使用threading进行调用多线程
from threading import Thread 2.使用
(1)使用函数方法
thread Thread(target方法对象, #不要括号args(参数1, 参数2) #一个参数也需要加上逗号
)
# 创建一个线程对象
thread.start()
# 执行线程对象
(2)使用类方法
import threading
import timeclass TestThread(threading.Thread):def __init__(self, n):super(TestThread, self).__init__()self.n ndef run(self):print(TestThread类的类方法方式创建多线程, self.n)time.sleep(0.5)t1TestThread(参数1)
t2TestThread(参数2) 3.小demo
from threading import Thread
from time import sleepdef work(name):for i in range(5):print(name)sleep(0.5)t1 Thread(targetwork,args(名字A,)
)
t2 Thread(targetwork,args(名字B,)
)
t1.start()
t2.start()
print(---结束---)
# 执行结果名字A
名字B
---结束---
名字A名字B
名字A名字B
名字B名字A
名字A名字B
Process finished with exit code 0两个子线程开始后主线程从上到下运行结束但两个子线程t1/t2还在运行4.等待执行 从demo中可以发现主线程结束了两个子线程还没结束使用join方法可以让主线程等待子线程执行完成后结束。 #使用方法join其目的是加入主线程等待
from threading import Thread
from time import sleepdef work(name):for i in range(5):print(name)sleep(0.5)t1 Thread(targetwork,args(名字A,)
)
t2 Thread(targetwork,args(名字B,)
)
t1.start()
t2.start()
t1.join()
t2.join()print(---结束---)名字A
名字B
名字A
名字B
名字B
名字A
名字A
名字B
名字B
名字A
---结束---
Process finished with exit code 0将t1和t2加入主线程主线程在等待t1/t2运行结束后才结束运行二、多线程压测构思
以最常使用压测的场景秒杀活动订单 为例子 1.生成随机且未注册手机号并登录 - 存数据为{“手机号”“登录token”},多个使用列表遍历 2.访问商品列表 - 读取秒杀商品,有多个选择第一个 3.秒杀商品提交订单 - 断言结果为success 不考虑其他操作和环境限制 三、实现
以秒杀商品订单为例,创建skilOrder包,py文件以sk_开头
1.工具类
# 工具类
ToolMySQL - 输入sql查询库并返回结果
ToolFakerDate - 生成一个库里未使用过的手机号
2.sk_variable.py 存储公共使用变量
class Variable:pass3.sk_setting.py
# 存放各类url
url1
url2
url3 # 存放请求头
header {Content-Type: application/json,User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36,xxx:xxx,...
}# 存放skillOrder中需要的前置条件/变量
4.sk_register.py
import requests
from jsonpath import jsonpathfrom seckillorder.sk_setting import 注册_url, head
from seckillorder.sk_variable import Variable
from 工具类 import ToolFakerDateclass RegisterLogin:# 注册并登录,存储登录信息[{“手机号”“登录token”},...]# 设置成Variable类属性def __init__(self,times):self.times timesself.fakerphone ToolFakerDate()def fakerphonelist(self):register_list []num 0while num self.times:c self.fakerphone.get_new_phone()if c not in register_list:register_list.append(c)num 1log.info(生成的手机号列表:, register_list)return register_listdef reslogin(self):log.info(-------------reslogin_start---------------)register_list self.fakerphonelist()login_info {}for i in register_list:data {xxx:xxx,.......# 根据注册接口所需参数填写}resq requests.post(urltes_url, headersheader, jsondata)if jsonpath(resq.json(),msg)[0] success:print(i, resq.text)login_info[f{i}] jsonpath(resq.json(),token)[0]setattr(Variable, login_info, login_info)log.info(login_info, login_info)log.info(-------------reslogin_end---------------)5.sk_product.py
import json
from threading import Threadimport requests
from jsonpath import jsonpathfrom seckillorder.sk_setting import 商品列表_url, head, 下单_url
from seckillorder.sk_variable import Variableclass GetProduce:# 获取商品列表信息,查找对应秒杀商品并暂存# 设置成Variable类属性def getproduceinfo(self):resq requests.get(url商品列表_url, headershead)for i in resq返回的内容:# 从响应体中去获取提交秒杀商品订单需要的参数def orderinfo(token):data [{xxx:xxx,# 提交订单所需要的参数}]header[token] tokenresq requests.post(url下单_url, headershead, jsondata)log.info(token, resq.text)log.info(header)class SumbimOrder:def SubmitOrder(self, method, token):theadlist []for k, v in token.items():log.info(k, v)thread Thread(targetmethod,args(v,))theadlist.append(thread)thread.start()for thread in theadlist:thread.join()四、写在最后 写在最后 感觉不像是想象中的多线程压测脚本 在创建完线程时候就已经完成下单了并发下单才有压测的感觉 断言只需要查库判断对应订单号是否创建和订单支付状态即可 写着玩图个乐 欢迎大佬指点迷津