百度网站建设解决方案,重庆最新消息今天,奉贤做网站的,网站建设时间规划表#x1f308;个人主页#xff1a;Fan_558 #x1f525; 系列专栏#xff1a;Linux #x1f339;关注我#x1f4aa;#x1f3fb;带你学更多操作系统知识 文章目录 前言一、生产消费模型#xff08;1#xff09;概念引入#xff08;2#xff09;生产消费模型的优点… 个人主页Fan_558 系列专栏Linux 关注我带你学更多操作系统知识 文章目录 前言一、生产消费模型1概念引入2生产消费模型的优点3生产消费模型的特点 二、基于阻塞队列的生产消费模型三、基于环形队列的生产消费模型1环形队列的生产消费模型特点 小结 前言
本文将会向你介绍基于阻塞队列和环形队列的生产消费模型
一、生产消费模型
1概念引入
1、什么是生产者消费者模型 生产者和消费是操作系统中一种重要的模型它描述的是一种等待和通知的机制 2、本文定义 生产者 产生数据的模块就形象地称为生产者 消费者 而处理数据的模块就称为消费者 缓冲区 生产者和消费者之间的中介就叫做缓冲区。 什么是阻塞队列 在多线程编程中阻塞队列(BlockingQueue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素当队列满时往队列里存放元素的操作也会被阻塞直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的线程在对阻塞队列进程操作时会被阻塞) 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯而通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给阻塞队列消费者不找生产者要数据而是直接从阻塞队列里取阻塞队列就相当于一个缓冲区平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。为什么要使用生产者消费者模型归根结底来说生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。 2生产消费模型的优点 优点1、解决了强耦合问题 假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法那么生产者对于消费者就会产生依赖也就是耦合。将来如果消费者的代码发生变化可能会影响到生产者。而如果两者都依赖于某个缓冲区两者之间不直接依赖耦合也就相应降低了。 就好比说你需要买零食你还要给生产厂家打电话然后他如果没有的话还要制作然后在交给你这多麻烦呀万一厂家哪天电话一变相当于生产者线程的代码变化导致消费者的代码也要变化那么你还得问问电话是多少。 优点2、支持并发concurrency即生产者和消费者可以是两个独立的并发主体互不干扰的运行。 生产者直接调用消费者的某个方法还有另一个弊端。由于函数调用是同步的或者叫阻塞的在消费者的方法没有返回之前生产者只好一直等在那边。万一消费者处理数据很慢生产者就会白白糟蹋大好时光。 使用了生产者消费者模式之后生产者和消费者可以是两个独立的并发主体常见并发类型有进程和线程两种。生产者把制造出来的数据往缓冲区一丢就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。 优点3、支持忙闲不均 如果制造数据的速度时快时慢缓冲区可以对其进行适当缓冲。当数据制造快的时候消费者来不及处理未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来消费者再慢慢处理掉。 3生产消费模型的特点
首先生产者只需要关心“仓库”并不需要关心具体的消费者。对于消费者而言它不需要关心具体的生产者它只需要关心这个“仓库”中还有没有东西存在。生产者生产的时候消费者不能进行“消费”消费者消费的时候生产者不能生产相当于一种互斥关系 即生产者和消费者一次只能有一人能访问到“仓库”。“仓库”为空时不能进行消费。“仓库”满时不能进行生产。 综上所述可以记忆三二一原则 即三种关系、两个角色、一个场所 1、三种关系生产者与生产者互斥 举个例子生产商作为生产者的身份生产者需要获得利益的最大化当然是希望自己一家独大那么生产者和生产者之间就是互斥的最起码在1号生产商正在生产时2号不能来捣乱体现在代码中就是互斥生产者与消费者同步与互斥 互斥举个例子比如当前你正要使用一号间厕所此时清洁人员来了那么他就需要在外面等待同理他正在清理一号间你也不能进入一号间 同步举个例子如果你当前正在使用一号间厕所那么清洁人员是可以清洁二号间的消费者与消费者互斥 举个例子两个人不能同时使用一间厕所
2、两个角色
生产者消费者
3、一个场所
缓冲区
基于以上特点我们来编写代码吧
二、基于阻塞队列的生产消费模型
//Task.hpp#pragma once
#include iostream
#include string
std::string oper_-*/%;
enum{DivZero 1,ModZero 2,UnKnown
};class Task
{
public:Task(int x, int y, char op):data1_(x),data2_(y),oper_(op),result_(0),exitcode_(0){}//计算
void run()
{switch(oper_){case :result_ data1_ data2_;break;case -:result_ data1_ - data2_;break;case *:result_ data1_ * data2_;break;case /:{if(data2_ 0){exitcode_ DivZero;}else result_ data1_ / data2_;}break;case %:if(data2_ 0){exitcode_ ModZero;}else result_ data1_ % data2_;break;default:exitcode_ UnKnown;break;}
}
void operator()()
{run();
}
//获取结果信息
std::string GetResult()
{std::string r std::to_string(data1_);r ;r oper_;r ;r std::to_string(data2_);r ;r std::to_string(result_);r [code:;r std::to_string(exitcode_);r ];return r;
}
//获取任务信息
std::string GetTask()
{std::string r std::to_string(data1_);r ;r oper_;r ;r std::to_string(data2_);r ?;return r;
}
private:int data1_;int data2_;char oper_;int result_;int exitcode_;
};//BlockQueue.hpp#include Task.hpp
#include iostream
#include queue
#include pthread.h
#include unistd.h
template class T
class BlockQueue
{//初始值static const int defalutnum 10;
public:BlockQueue(int maxcap defalutnum):maxcap_(maxcap){pthread_mutex_init(mutex_, nullptr);pthread_cond_init(c_cond_, nullptr);pthread_cond_init(p_cond_, nullptr);//定义上下限满足下限就唤醒生产者生产满足上限就唤醒消费者消费low_water_ maxcap_ / 3;high_water_ (maxcap_*2) / 3;}//消费const T pop(){pthread_mutex_lock(mutex_);//条件变量while(q_.size() 0){//进入等待队列中排队pthread_cond_wait(c_cond_, mutex_);}T out q_.front();q_.pop();if(q_.size() low_water_){pthread_cond_signal(p_cond_);}//唤醒消费者pthread_mutex_unlock(mutex_);return out;}//生产void push(const T in){pthread_mutex_lock(mutex_);//条件变量while(q_.size() maxcap_){//进入等待队列中排队pthread_cond_wait(p_cond_, mutex_);}//生产一个数据q_.push(in);//唤醒消费者if(q_.size() high_water_){pthread_cond_signal(c_cond_);}pthread_mutex_unlock(mutex_);}//析构~BlockQueue(){pthread_mutex_destroy(mutex_);pthread_cond_destroy(c_cond_);pthread_cond_destroy(p_cond_);}private:std::queueT q_; //共享资源阻塞队列int maxcap_; //最大容量pthread_mutex_t mutex_; //锁pthread_cond_t p_cond_; //生产者条件变量pthread_cond_t c_cond_; //消费者条件变量int low_water_;int high_water_;
};值得一提的是条件变量的判断部分我们不能使用if而是要用while原因是防止产生伪唤醒的现象消费者只消费了一个但是唤醒时如果我们使用broadcast去唤醒生产者线程它们就不会在条件变量下等待了而是都去竞争这个锁了。如果此时一个生产者线程竞争成功后就去生产假设只剩下一个空位供生产然后它生产后紧接着唤醒消费者线程去消费然后解锁但是此时还有另外两个生产者线程在竞争锁他们都在条件变量下等待然后有可能又是生产者线程拿到锁消费者却没有竞争到锁继续向后执行但是呢if条件不会再判断了函数返回继续向下执行结果就是生产多了已经没有空间去生产了这就是伪唤醒状态 总而言之while循环会让我们对上述背景中等待的生产者们再次判断是否满足生产的条件防止产生伪唤醒的现象
//main.cc#include BlockQueue.hpp
#include Task.hpp
#include stdlib.h
//生产
void* Productor(void* args)
{BlockQueueTask *bq static_castBlockQueueTask *(args);//sleep(5);while(true){int number1 rand() % 10 1;usleep(10);//让随机数number2出现0的情况int number2 rand() % 10;int r rand() % 5;Task t(number1, number2, oper_[r]);bq-push(t); //传入任务信息std::cout thread id: pthread_self() 传入任务 t.GetTask() std::endl;sleep(1);}
}
//消费
void* Consumer(void* args)
{BlockQueueTask *bq static_castBlockQueueTask *(args);while(true){Task ret bq-pop();ret();std::cout thread id: pthread_self() run... ret.GetResult() std::endl;sleep(1);}
}int main()
{//创建指向阻塞队列的指针其中队列中放着一个个Task对象任务BlockQueueTask *bq new BlockQueueTask();pthread_t c[3], p[5];//创建生产、消费线程for(int i 0; i 5; i){pthread_create(p i, nullptr, Productor, bq);}for(int i 0; i 3; i){pthread_create(c i, nullptr, Consumer, bq);}//线程等待for(int i 0; i 5; i){pthread_join(p[i], nullptr);}for(int i 0; i 3; i){pthread_join(c[i], nullptr);}delete bq;return 0;
}运行结果
三、基于环形队列的生产消费模型
1环形队列的生产消费模型特点
环形队列采用数组模拟用模运算来模拟环状特性。 环形结构起始状态和结束状态都是一样的不好判断为空或者为满所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置作为满的状态。但是我们现在有信号量这个计数器就很简单的进行多线程间的同步过程
初始化信号量
#include semaphore.h
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数pshared:0表示线程间共享非零表示进程间共享value信号量初始值销毁信号量
int sem_destroy(sem_t *sem);等待信号量
功能等待信号量使信号量-1若信号量为负数那么线程就会阻塞等待。
int sem_wait(sem_t *sem); //P()发布信号量
功能发布信号量表示资源使用完毕可以归还资源了。将信号量值加1。若信号量没达到最大值则说明有现成正在阻塞等待V操作就会唤醒一个线程。
int sem_post(sem_t *sem);//V()于是我们就可以定义两个信号量 N表示环形队列缓冲区中能存放的空间数量 默认DataSem初始生产数据值为0
注意 代码如下省略Task.hpp
//ringQueue#pragma once
#include iostream
#include vector
#include semaphore.hconst static int defaultcap 5;templateclass T
class RingQueue{
private://申请信号量void P(sem_t sem){sem_wait(sem);}//释放信号量void V(sem_t sem){sem_post(sem);}void Lock(pthread_mutex_t mutex){pthread_mutex_lock(mutex);}void Unlock(pthread_mutex_t mutex){pthread_mutex_unlock(mutex);}
public:RingQueue(int cap defaultcap):ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0){sem_init(cdata_sem_, 0, 0);sem_init(pspace_sem_, 0, cap);pthread_mutex_init(c_mutex_, nullptr);pthread_mutex_init(p_mutex_, nullptr);}//生产void Push(const T in) {//判断: 申请空间信号量失败阻塞P(pspace_sem_);//加锁Lock(p_mutex_);ringqueue_[p_step_] in;p_step_;p_step_ % cap_;Unlock(p_mutex_);//释放数据信号量V(cdata_sem_);}//消费void Pop(T *out) {//判断: 申请数据信号量失败阻塞P(cdata_sem_);//加锁Lock(c_mutex_);*out ringqueue_[c_step_];c_step_;c_step_ % cap_;Unlock(c_mutex_);//释放空间信号量V(pspace_sem_);}~RingQueue(){sem_destroy(cdata_sem_);sem_destroy(pspace_sem_);pthread_mutex_destroy(c_mutex_);pthread_mutex_destroy(p_mutex_);}
private:std::vectorT ringqueue_;int cap_;int c_step_; // 消费者下标int p_step_; // 生产者下标sem_t cdata_sem_; // 消费者关注的数据资源sem_t pspace_sem_; // 生产者关注的空间资源pthread_mutex_t c_mutex_;pthread_mutex_t p_mutex_;
};//main.cc#include ringQueue.hpp
#include Task.hpp
#include stdlib.h
#include unistd.h
#include stringstruct ThreadData
{RingQueueTask *rq; //指向环形队列的指针std::string threadname; //线程名
};//生产
void* Productor(void* args)
{ThreadData *td static_castThreadData *(args);RingQueueTask *rq td-rq;//sleep(5);while(true){int number1 rand() % 10 1;usleep(10);//让随机数number2出现0的情况int number2 rand() % 10;int r rand() % 5;Task t(number1, number2, oper_[r]);rq-Push(t); //传入任务信息std::cout 我是: td-threadname 任务信息: t.GetTask() std::endl;sleep(1);}return nullptr;
}
//消费
void* Consumer(void* args)
{ThreadData *td static_castThreadData *(args);RingQueueTask *rq td-rq;Task t;while(true){rq-Pop(t);t();std::cout 我是: td-threadname result: t.GetResult() std::endl;sleep(1);}return nullptr;
}int main()
{//创建指向阻塞队列的指针其中队列中放着一个个Task对象任务srand(time(nullptr) ^ getpid());RingQueueTask *rq new RingQueueTask(50);pthread_t c[3], p[5];for (int i 0; i 5; i){ThreadData *td new ThreadData();td-rq rq;td-threadname Productor- std::to_string(i);pthread_create(p i, nullptr, Productor, td);}for (int i 0; i 3; i){ThreadData *td new ThreadData();td-rq rq;td-threadname Consumer- std::to_string(i);pthread_create(c i, nullptr, Consumer, td);}for (int i 0; i 5; i){pthread_join(p[i], nullptr);}for (int i 0; i 3; i){pthread_join(c[i], nullptr);}delete rq;return 0;
}运行结果
小结
今日的分享就到这里啦如果本文存在疏漏或错误的地方还请您能够指出