网站程序 wap pc 同步,像网站的ppt怎么做的,个人网站是商业的吗,百度服务电话自适应限流
服务的处理能力是有客观上限的。当请求速度超过服务的处理速度时#xff0c;服务就会过载。
如果服务持续过载#xff0c;会导致越来越多的请求积压#xff0c;最终所有的请求都必须等待较长时间才能被处理#xff0c;从而使整个服务处于瘫痪状态。
与之相对…自适应限流
服务的处理能力是有客观上限的。当请求速度超过服务的处理速度时服务就会过载。
如果服务持续过载会导致越来越多的请求积压最终所有的请求都必须等待较长时间才能被处理从而使整个服务处于瘫痪状态。
与之相对的如果直接拒绝掉一部分请求反而能够让服务能够及时处理更多的请求。对应的方法就是设置最大并发。
自适应限流能动态调整服务的最大并发在保证服务不过载的前提下让服务尽可能多的处理请求。
使用场景
通常情况下要让服务不过载只需在上线前进行压力测试并通过little’s law计算出best_max_concurrency就可以了并发度 时延 * QPS)。但在服务数量多拓扑复杂且处理能力会逐渐变化的局面下使用固定的最大并发会带来巨大的测试工作量很不方便。自适应限流就是为了解决这个问题。
使用自适应限流前建议做到
客户端开启了重试功能。服务端有多个节点。这样当一个节点返回过载时客户端可以向其他的节点发起重试从而尽量不丢失流量。
brpc开启自适应限流方法
目前只有method级别(即具体的rpc服务方法)支持自适应限流。如果要为某个method开启自适应限流只需要将它的最大并发设置为auto即可。
// Set auto concurrency limiter for all methods
brpc::ServerOptions options;
options.method_max_concurrency auto;// Set auto concurrency limiter for specific method
server.MaxConcurrencyOf(example.EchoService.Echo) auto;原理
名词及解释 concurrency(并发度): 同时处理的请求数又被称为“并发度”。 max_concurrency: 设置的最大并发度。超过并发的请求会被拒绝返回ELIMIT错误在集群层面client应重试到另一台server上去。 best_max_concurrency: 并发的物理含义是任务处理槽位天然存在上限这个上限就是best_max_concurrency也就是最佳的最大并发度一般推荐设置最大并发为该值若max_concurrency设置的过大则concurrency可能大于best_max_concurrency任务将无法被及时处理而暂存在各种队列中排队系统也会进入拥塞状态。若max_concurrency设置的过小则concurrency总是会小于best_max_concurrency限制系统达到本可以达到的更高吞吐。 noload_latency: 单纯处理任务的延时不包括排队时间。另一种解释是低负载的延时。由于正确处理任务得经历必要的环节其中会耗费cpu或等待下游返回noload_latency是一个服务固有的属性但可能随时间逐渐改变由于内存碎片压力变化业务数据变化等因素。 min_latency: 实际测定的latency中的较小值的ema当concurrency不大于best_max_concurrency时min_latency和noload_latency接近(可能轻微上升。 peak_qps: 服务处理qps的上限。注意是处理或回复的qps而不是接收的qps。值取决于best_max_concurrency / noload_latency这两个量都是服务的固有属性故peak_qps也是服务的固有属性和拥塞状况无关但可能随时间逐渐改变。 max_qps: 实际测定的qps中的较大值。由于qps具有上限max_qps总是会小于peak_qps不论拥塞与否。 - Little’s Law 在服务处于稳定状态时: concurrency latency * qps。 这是自适应限流的理论基础。
当服务没有超载时随着流量的上升latency基本稳定(接近noload_latency)qps和concurrency呈线性关系一起上升。
当流量超过服务的peak_qps时则concurrency和latency会一起上升而qps会稳定在peak_qps。
假如一个服务的peak_qps和noload_latency都比较稳定那么它的best_max_concurrency noload_latency * peak_qps。
自适应限流就是要找到服务的noload_latency和peak_qps 并将最大并发设置为靠近两者乘积的一个值。
自适应限流计算公式
自适应限流会不断的对请求进行采样当采样窗口的样本数量足够时会根据样本的平均延迟和服务当前的qps计算出下一个采样窗口的max_concurrency:
max_concurrency max_qps * ((2alpha) * min_latency - latency)alpha为可接受的延时上升幅度默认0.3。latency是当前采样窗口内所有请求的平均latency。max_qps是最近一段时间测量到的qps的极大值。min_latency是最近一段时间测量到的latency较小值的ema是noload_latency的估算值。
注意当计算出来的 max_concurrency 和当前的 max_concurrency 的值不同时每次对 max_concurrency 的调整的比例有一个上限让 max_concurrency 的变化更为平滑。
当服务处于低负载时min_latency约等于noload_latency此时计算出来的max_concurrency会高于concurrency但低于best_max_concurrency给流量上涨留探索空间。而当服务过载时服务的qps约等于max_qps同时latency开始明显超过min_latency此时max_concurrency则会接近concurrency并通过定期衰减避免远离best_max_concurrency保证服务不会过载。 估算noload_latency
服务的noload_latency并非是一成不变的自适应限流必须能够正确的探测noload_latency的变化。当noload_latency下降时是很容感知到的因为这个时候latency也会下降。难点在于当latency上涨时需要能够正确的辨别到底是服务过载了还是noload_latency上涨了。
可能的方案有
取最近一段时间的最小latency来近似noload_latency
取最近一段时间的latency的各种平均值来预测noload_latency
收集请求的平均排队等待时间使用latency - queue_time作为noload_latency
每隔一段时间缩小max_concurrency过一小段时间后以此时的latency作为noload_latency方案1和方案2的问题在于假如服务持续处于高负载那么最近的所有latency都会高出noload_latency从而使得算法估计的noload_latency不断升高。
方案3的问题在于假如服务的性能瓶颈在下游服务那么请求在服务本身的排队等待时间无法反应整体的负载情况。
方案4是最通用的也经过了大量实验的考验。缩小max_concurrency和公式中的alpha存在关联。让我们做个假想实验若latency极为稳定并都等于min_latency那么公式简化为max_concurrency max_qps * latency * (1 alpha)。根据little’s lawqps最多为max_qps * (1 alpha). alpha是qps的探索空间若alpha为0则qps被锁定为max_qps算法可能无法探索到peak_qps。但在qps已经达到peak_qps时alpha会使延时上升已拥塞此时测定的min_latency会大于noload_latency一轮轮下去最终会导致min_latency不收敛。定期降低max_concurrency就是阻止这个过程并给min_latency下降提供探索空间。 减少重测时的流量损失
每隔一段时间自适应限流算法都会缩小max_concurrency并持续一段时间然后将此时的latency作为服务的noload_latency以处理noload_latency上涨了的情况。测量noload_latency时必须让先服务处于低负载的状态因此对max_concurrency的缩小是难以避免的。
由于max_concurrency concurrency时服务会拒绝掉所有的请求限流算法将排空所有的经历过排队等待的请求的时间 设置为 latency * 2 以确保用于计算min_latency的样本绝大部分都是没有经过排队等待的。
由于服务的latency通常都不会太长这种做法所带来的流量损失也很小。 应对抖动
即使服务自身没有过载latency也会发生波动根据Little’s Lawlatency的波动会导致server的concurrency发生波动。
我们在设计自适应限流的计算公式时考虑到了latency发生抖动的情况: 当latency与min_latency很接近时根据计算公式会得到一个较高max_concurrency来适应concurrency的波动从而尽可能的减少“误杀”。同时随着latency的升高max_concurrency会逐渐降低以保护服务不会过载。
从另一个角度来说当latency也开始升高时通常意味着某处(不一定是服务本身也有可能是下游服务)消耗了大量CPU资源这个时候缩小max_concurrency也是合理的。 平滑处理
为了减少个别窗口的抖动对限流算法的影响同时尽量降低计算开销计算min_latency时会通过使用EMA来进行平滑处理
if latency min_latency: min_latency latency * ema_alpha (1 - ema_alpha) * min_latency else: do_nothing
估算peak_qps 提高qps增长的速度
当服务启动时由于服务本身需要进行一系列的初始化tcp本身也有慢启动等一系列原因。服务在刚启动时的qps一定会很低。这就导致了服务启动时的max_concurrency也很低。而按照上面的计算公式当max_concurrency很低的时候预留给qps增长的冗余concurrency也很低(即alpha * max_qps * min_latency)。从而会影响当流量增加时服务max_concurrency的增加速度。
假如从启动到打满qps的时间过长这期间会损失大量流量。在这里我们采取的措施有两个
采样方面一旦采到的请求数量足够多直接提交当前采样窗口而不是等待采样窗口的到时间了才提交
计算公式方面当current_qps 保存的max_qps时直接进行更新不进行平滑处理。在进行了这两个处理之后绝大部分情况下都能够在2秒左右将qps打满。 平滑处理
为了减少个别窗口的抖动对限流算法的影响同时尽量降低计算开销在计算max_qps时会通过使用EMA来进行平滑处理
if current_qps max_qps: max_qps current_qps else: max_qps current_qps * ema_alpha / 10 (1 - ema_alpha / 10) * max_qps
将max_qps的ema参数置为min_latency的ema参数的十分之一的原因是: max_qps 下降了通常并不意味着极限qps也下降了。而min_latency下降了通常意味着noload_latency确实下降了。 与netflix gradient算法的对比
netflix中的gradient算法公式为max_concurrency min_latency / latency * max_concurrency queue_size。
其中latency是采样窗口的最小latencymin_latency是最近多个采样窗口的最小latency。min_latency / latency就是算法中的梯度当latency大于min_latency时max_concurrency会逐渐减少反之max_concurrency会逐渐上升从而让max_concurrency围绕在best_max_concurrency附近。
这个公式可以和本文的算法进行类比 gradient算法中的latency和本算法的不同前者的latency是最小值后者是平均值。netflix的原意是最小值能更好地代表noload_latency但实际上只要不对max_concurrency做定期衰减不管最小值还是平均值都有可能不断上升使算法不收敛。最小值并不能带来额外的好处反而会使算法更不稳定。 gradient算法中的max_concurrency / latency从概念上和qps有关联根据little’s law)但可能严重脱节。比如在重测 min_latency前若所有latency都小于min_latency那么max_concurrency会不断下降甚至到0但按照本算法max_qps和min_latency仍然是稳定的它们计算出的max_concurrency也不会剧烈变动。究其本质gradient算法在迭代max_concurrency时latency并不能代表实际并发为max_concurrency时的延时两者是脱节的所以max_concurrency / latency的实际物理含义不明与qps可能差异甚大最后导致了很大的偏差。 gradient算法的queue_size推荐为sqrt(max_concurrency)这是不合理的。netflix对queue_size的理解大概是代表各种不可控环节的缓存比如socket里的和max_concurrency存在一定的正向关系情有可原。但在我们的理解中这部分queue_size作用微乎其微没有或用常量即可。我们关注的queue_size是给concurrency上升留出的探索空间: max_concurrency的更新是有延迟的在并发从低到高的增长过程中queue_size的作用就是在max_concurrency更新前不限制qps上升。而当concurrency高时服务可能已经过载了queue_size就应该小一点防止进一步恶化延时。这里的queue_size和并发是反向关系。
服务端代码实现
#include gflags/gflags.h
#include butil/logging.h
#include brpc/server.h
#include butil/atomicops.h
#include butil/time.h
#include butil/logging.h
#include json2pb/json_to_pb.h
#include bthread/timer_thread.h
#include bthread/bthread.h#include cstdlib
#include fstream
#include cl_test.pb.hDEFINE_int32(logoff_ms, 2000, Maximum duration of servers LOGOFF state (waiting for client to close connection before server stops));
DEFINE_int32(server_bthread_concurrency, 4, Configuring the value of bthread_concurrency, For compute max qps, );
DEFINE_int32(server_sync_sleep_us, 2500, Usleep time, each request will be executed once, For compute max qps);
// max qps 1000 / 2.5 * 4 DEFINE_int32(control_server_port, 9000, );
DEFINE_int32(echo_port, 9001, TCP Port of echo server);
DEFINE_int32(cntl_port, 9000, TCP Port of controller server);
DEFINE_string(case_file, , File path for test_cases);
DEFINE_int32(latency_change_interval_us, 50000, Intervalt for server side changes the latency);
DEFINE_int32(server_max_concurrency, 0, Echo Servers max_concurrency);
DEFINE_bool(use_usleep, false, EchoServer uses ::usleep or bthread_usleep to simulate latency when processing requests);bthread::TimerThread g_timer_thread;int cast_func(void* arg) {return *(int*)arg;
}void DisplayStage(const test::Stage stage) {std::string type;switch(stage.type()) {case test::FLUCTUATE: type Fluctuate;break;case test::SMOOTH:type Smooth;break;default:type Unknown;}std::stringstream ss;ss Stage:[ stage.lower_bound() : stage.upper_bound() ] , Type: type;LOG(INFO) ss.str();
}butil::atomicint cnt(0);
butil::atomicint atomic_sleep_time(0);
bvar::PassiveStatusint atomic_sleep_time_bvar(cast_func, atomic_sleep_time);namespace bthread {
DECLARE_int32(bthread_concurrency);
}void TimerTask(void* data);class EchoServiceImpl : public test::EchoService {
public:EchoServiceImpl() : _stage_index(0), _running_case(false) {};virtual ~EchoServiceImpl() {};void SetTestCase(const test::TestCase test_case) {_test_case test_case;_next_stage_start _test_case.latency_stage_list(0).duration_sec() butil::gettimeofday_s();_stage_index 0;_running_case false;DisplayStage(_test_case.latency_stage_list(_stage_index));}void StartTestCase() {CHECK(!_running_case);_running_case true;UpdateLatency();}void StopTestCase() {_running_case false;}void UpdateLatency() {if (!_running_case) {return;}ComputeLatency();g_timer_thread.schedule(TimerTask, (void*)this, butil::microseconds_from_now(FLAGS_latency_change_interval_us));}virtual void Echo(google::protobuf::RpcController* cntl_base,const test::NotifyRequest* request,test::NotifyResponse* response,google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done); response-set_message(hello);::usleep(FLAGS_server_sync_sleep_us);if (FLAGS_use_usleep) {::usleep(_latency.load(butil::memory_order_relaxed));} else {bthread_usleep(_latency.load(butil::memory_order_relaxed));}}void ComputeLatency() {if (_stage_index _test_case.latency_stage_list_size() butil::gettimeofday_s() _next_stage_start) {_stage_index;if (_stage_index _test_case.latency_stage_list_size()) {_next_stage_start _test_case.latency_stage_list(_stage_index).duration_sec();DisplayStage(_test_case.latency_stage_list(_stage_index));}}if (_stage_index _test_case.latency_stage_list_size()) {const test::Stage latency_stage _test_case.latency_stage_list(_stage_index - 1);if (latency_stage.type() test::ChangeType::FLUCTUATE) {_latency.store((latency_stage.lower_bound() latency_stage.upper_bound()) / 2,butil::memory_order_relaxed);} else if (latency_stage.type() test::ChangeType::SMOOTH) {_latency.store(latency_stage.upper_bound(), butil::memory_order_relaxed);}return;}const test::Stage latency_stage _test_case.latency_stage_list(_stage_index);const int lower_bound latency_stage.lower_bound();const int upper_bound latency_stage.upper_bound();if (latency_stage.type() test::FLUCTUATE) {_latency.store(butil::fast_rand_less_than(upper_bound - lower_bound) lower_bound,butil::memory_order_relaxed); } else if (latency_stage.type() test::SMOOTH) {int latency lower_bound (upper_bound - lower_bound) / double(latency_stage.duration_sec()) * (latency_stage.duration_sec() - _next_stage_start butil::gettimeofday_s());_latency.store(latency, butil::memory_order_relaxed);} else {LOG(FATAL) Wrong Type: latency_stage.type();}}private:int _stage_index;int _next_stage_start;butil::atomicint _latency;test::TestCase _test_case;bool _running_case;
};void TimerTask(void* data) {EchoServiceImpl* echo_service (EchoServiceImpl*)data;echo_service-UpdateLatency();
}class ControlServiceImpl : public test::ControlService {
public:ControlServiceImpl() : _case_index(0) {LoadCaseSet(FLAGS_case_file);_echo_service new EchoServiceImpl;if (_server.AddService(_echo_service,brpc::SERVER_OWNS_SERVICE) ! 0) {LOG(FATAL) Fail to add service;}g_timer_thread.start(NULL);}virtual ~ControlServiceImpl() { _echo_service-StopTestCase();g_timer_thread.stop_and_join(); };virtual void Notify(google::protobuf::RpcController* cntl_base,const test::NotifyRequest* request,test::NotifyResponse* response,google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done);const std::string message request-message();LOG(INFO) message;if (message ResetCaseSet) {_server.Stop(0);_server.Join();_echo_service-StopTestCase();LoadCaseSet(FLAGS_case_file);_case_index 0;response-set_message(CaseSetReset);} else if (message StartCase) {CHECK(!_server.IsRunning()) Continuous StartCase;const test::TestCase test_case _case_set.test_case(_case_index);_echo_service-SetTestCase(test_case);brpc::ServerOptions options;options.max_concurrency FLAGS_server_max_concurrency;_server.MaxConcurrencyOf(test.EchoService.Echo) test_case.max_concurrency();_server.Start(FLAGS_echo_port, options); _echo_service-StartTestCase();response-set_message(CaseStarted);} else if (message StopCase) {CHECK(_server.IsRunning()) Continuous StopCase;_server.Stop(0);_server.Join();_echo_service-StopTestCase();response-set_message(CaseStopped);} else {LOG(FATAL) Invalid message: message;response-set_message(Invalid Cntl Message);}}private:void LoadCaseSet(const std::string file_path) {std::ifstream ifs(file_path.c_str(), std::ios::in); if (!ifs) {LOG(FATAL) Fail to open case set file: file_path;}std::string case_set_json((std::istreambuf_iteratorchar(ifs)), std::istreambuf_iteratorchar()); test::TestCaseSet case_set;std::string err;if (!json2pb::JsonToProtoMessage(case_set_json, case_set, err)) {LOG(FATAL) Fail to trans case_set from json to protobuf message: err;}_case_set case_set;ifs.close();}brpc::Server _server;EchoServiceImpl* _echo_service;test::TestCaseSet _case_set;int _case_index;
};int main(int argc, char* argv[]) {// Parse gflags. We recommend you to use gflags as well.GFLAGS_NS::ParseCommandLineFlags(argc, argv, true);bthread::FLAGS_bthread_concurrency FLAGS_server_bthread_concurrency;brpc::Server server;ControlServiceImpl control_service_impl;if (server.AddService(control_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) ! 0) {LOG(ERROR) Fail to add service;return -1;}if (server.Start(FLAGS_cntl_port, NULL) ! 0) {LOG(ERROR) Fail to start EchoServer;return -1;}server.RunUntilAskedToQuit();return 0;
}
客户端代码实现
#include gflags/gflags.h
#include butil/logging.h
#include butil/time.h
#include brpc/channel.h
#include bvar/bvar.h
#include bthread/timer_thread.h
#include json2pb/json_to_pb.h#include fstream
#include cl_test.pb.hDEFINE_string(protocol, baidu_std, Protocol type. Defined in src/brpc/options.proto);
DEFINE_string(connection_type, , Connection type. Available values: single, pooled, short);
DEFINE_string(cntl_server, 0.0.0.0:9000, IP Address of server);
DEFINE_string(echo_server, 0.0.0.0:9001, IP Address of server);
DEFINE_int32(timeout_ms, 3000, RPC timeout in milliseconds);
DEFINE_int32(max_retry, 0, Max retries(not including the first RPC));
DEFINE_int32(case_interval, 20, Intervals for different test cases);
DEFINE_int32(client_qps_change_interval_us, 50000, The interval for client changes the sending speed);
DEFINE_string(case_file, , File path for test_cases);void DisplayStage(const test::Stage stage) {std::string type;switch(stage.type()) {case test::FLUCTUATE: type Fluctuate;break;case test::SMOOTH:type Smooth;break;default:type Unknown;}std::stringstream ss;ss Stage:[ stage.lower_bound() : stage.upper_bound() ] , Type: type;LOG(INFO) ss.str();
}uint32_t cast_func(void* arg) {return *(uint32_t*)arg;
}butil::atomicuint32_t g_timeout(0);
butil::atomicuint32_t g_error(0);
butil::atomicuint32_t g_succ(0);
bvar::PassiveStatusuint32_t g_timeout_bvar(cast_func, g_timeout);
bvar::PassiveStatusuint32_t g_error_bvar(cast_func, g_error);
bvar::PassiveStatusuint32_t g_succ_bvar(cast_func, g_succ);
bvar::LatencyRecorder g_latency_rec;void LoadCaseSet(test::TestCaseSet* case_set, const std::string file_path) {std::ifstream ifs(file_path.c_str(), std::ios::in); if (!ifs) {LOG(FATAL) Fail to open case set file: file_path;}std::string case_set_json((std::istreambuf_iteratorchar(ifs)), std::istreambuf_iteratorchar()); std::string err;if (!json2pb::JsonToProtoMessage(case_set_json, case_set, err)) {LOG(FATAL) Fail to trans case_set from json to protobuf message: err;}
}void HandleEchoResponse(brpc::Controller* cntl,test::NotifyResponse* response) {// std::unique_ptr makes sure cntl/response will be deleted before returning.std::unique_ptrbrpc::Controller cntl_guard(cntl);std::unique_ptrtest::NotifyResponse response_guard(response);if (cntl-Failed() cntl-ErrorCode() brpc::ERPCTIMEDOUT) {g_timeout.fetch_add(1, butil::memory_order_relaxed);LOG_EVERY_N(INFO, 1000) cntl-ErrorText();} else if (cntl-Failed()) {g_error.fetch_add(1, butil::memory_order_relaxed);LOG_EVERY_N(INFO, 1000) cntl-ErrorText();} else {g_succ.fetch_add(1, butil::memory_order_relaxed);g_latency_rec cntl-latency_us();}}void Expose() {g_timeout_bvar.expose_as(cl, timeout);g_error_bvar.expose_as(cl, failed);g_succ_bvar.expose_as(cl, succ);g_latency_rec.expose(cl);
}struct TestCaseContext {TestCaseContext(const test::TestCase tc) : running(true), stage_index(0), test_case(tc), next_stage_sec(test_case.qps_stage_list(0).duration_sec() butil::gettimeofday_s()) {DisplayStage(test_case.qps_stage_list(stage_index));Update();}bool Update() {if (butil::gettimeofday_s() next_stage_sec) {stage_index;if (stage_index test_case.qps_stage_list_size()) {next_stage_sec test_case.qps_stage_list(stage_index).duration_sec(); DisplayStage(test_case.qps_stage_list(stage_index));} else {return false;}}int qps 0;const test::Stage qps_stage test_case.qps_stage_list(stage_index);const int lower_bound qps_stage.lower_bound();const int upper_bound qps_stage.upper_bound();if (qps_stage.type() test::FLUCTUATE) {qps butil::fast_rand_less_than(upper_bound - lower_bound) lower_bound;} else if (qps_stage.type() test::SMOOTH) {qps lower_bound (upper_bound - lower_bound) / double(qps_stage.duration_sec()) * (qps_stage.duration_sec() - next_stage_sec butil::gettimeofday_s());}interval_us.store(1.0 / qps * 1000000, butil::memory_order_relaxed);return true;}butil::atomicbool running;butil::atomicint64_t interval_us;int stage_index;const test::TestCase test_case;int next_stage_sec;
};void RunUpdateTask(void* data) {TestCaseContext* context (TestCaseContext*)data;bool should_continue context-Update();if (should_continue) {bthread::get_global_timer_thread()-schedule(RunUpdateTask, data, butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));} else {context-running.store(false, butil::memory_order_release);}
}void RunCase(test::ControlService_Stub cntl_stub, const test::TestCase test_case) {LOG(INFO) Running case: test_case.case_name() \;brpc::Channel channel;brpc::ChannelOptions options;options.protocol FLAGS_protocol;options.connection_type FLAGS_connection_type;options.timeout_ms FLAGS_timeout_ms;options.max_retry FLAGS_max_retry;if (channel.Init(FLAGS_echo_server.c_str(), options) ! 0) {LOG(FATAL) Fail to initialize channel;}test::EchoService_Stub echo_stub(channel);test::NotifyRequest cntl_req;test::NotifyResponse cntl_rsp;brpc::Controller cntl;cntl_req.set_message(StartCase);cntl_stub.Notify(cntl, cntl_req, cntl_rsp, NULL);CHECK(!cntl.Failed()) control failed;TestCaseContext context(test_case);bthread::get_global_timer_thread()-schedule(RunUpdateTask, context, butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));while (context.running.load(butil::memory_order_acquire)) {test::NotifyRequest echo_req;echo_req.set_message(hello);brpc::Controller* echo_cntl new brpc::Controller;test::NotifyResponse* echo_rsp new test::NotifyResponse;google::protobuf::Closure* done brpc::NewCallback(HandleEchoResponse, echo_cntl, echo_rsp);echo_stub.Echo(echo_cntl, echo_req, echo_rsp, done);::usleep(context.interval_us.load(butil::memory_order_relaxed));}LOG(INFO) Waiting to stop case: test_case.case_name() \;::sleep(FLAGS_case_interval);cntl.Reset();cntl_req.set_message(StopCase);cntl_stub.Notify(cntl, cntl_req, cntl_rsp, NULL);CHECK(!cntl.Failed()) control failed;LOG(INFO) Case test_case.case_name() finshed:;
}int main(int argc, char* argv[]) {// Parse gflags. We recommend you to use gflags as well.GFLAGS_NS::ParseCommandLineFlags(argc, argv, true);Expose();brpc::Channel channel;brpc::ChannelOptions options;options.protocol FLAGS_protocol;options.connection_type FLAGS_connection_type;options.timeout_ms FLAGS_timeout_ms;if (channel.Init(FLAGS_cntl_server.c_str(), options) ! 0) {LOG(ERROR) Fail to initialize channel;return -1;}test::ControlService_Stub cntl_stub(channel);test::TestCaseSet case_set;LoadCaseSet(case_set, FLAGS_case_file);brpc::Controller cntl;test::NotifyRequest cntl_req;test::NotifyResponse cntl_rsp;cntl_req.set_message(ResetCaseSet);cntl_stub.Notify(cntl, cntl_req, cntl_rsp, NULL);CHECK(!cntl.Failed()) Cntl Failed;for (int i 0; i case_set.test_case_size(); i) {RunCase(cntl_stub, case_set.test_case(i));}LOG(INFO) EchoClient is going to quit;return 0;
}proto
syntaxproto2;
package test;option cc_generic_services true;message NotifyRequest {required string message 1;
};message NotifyResponse {required string message 1;
};enum ChangeType {FLUCTUATE 1; // Fluctuating between upper and lower bound SMOOTH 2; // Smoothly rising from the lower bound to the upper bound
}message Stage {required int32 lower_bound 1;required int32 upper_bound 2;required int32 duration_sec 3;required ChangeType type 4;
}message TestCase {required string case_name 1;required string max_concurrency 2;repeated Stage qps_stage_list 3;repeated Stage latency_stage_list 4;
}message TestCaseSet {repeated TestCase test_case 1;
}service ControlService {rpc Notify(NotifyRequest) returns (NotifyResponse);
}service EchoService {rpc Echo(NotifyRequest) returns (NotifyResponse);
};