专业手机网站怎么做,北京公司注销流程及费用,寄生虫seo教程,揭阳商城网站建设目录 生产者消费者模型
1. 生产者消费者模式的概念
2. 生产者消费者模型优点
编辑3. 生产者消费者模型的特点
基于BlockingQueue#xff08;阻塞队列#xff09;的生产者消费者模型
1.BlockingQueue
2. 使用CSTL中的queue来模拟实现阻塞队列
3. 基于任务的生产者消费…目录 生产者消费者模型
1. 生产者消费者模式的概念
2. 生产者消费者模型优点
编辑3. 生产者消费者模型的特点
基于BlockingQueue阻塞队列的生产者消费者模型
1.BlockingQueue
2. 使用CSTL中的queue来模拟实现阻塞队列
3. 基于任务的生产者消费者模型
4.生产消费过程是高效的
5.伪唤醒问题 生产者消费者模型
1. 生产者消费者模式的概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯而通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给阻塞队列消费者不找生产者要数据而是直接从阻塞队列里取阻塞队列就相当于一个缓冲区平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
2. 生产者消费者模型优点
解耦支持并发支持忙闲不均
3. 生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典场景我们生活中也有许多的消费者与生产者模型下面我们来举一个生活中的生产者消费者模型的例子——超市 那么超市的作用是什么呢
超市可以收集需求减少交易成本从而提高效率超市相当于一个大号的缓存支持忙闲不均将生产环节与消费环节进行了解耦
下面我们再来分析一下生产者、消费者之间有什么关系
生产者——生产者竞争互斥关系消费者——消费者竞争互斥关系生产者——消费者互斥关系与同步关系 为什么消费者与消费者之间是互斥关系 假如说现在是世界末日超市里面现在就只有一包方便面然后你和另外一个人正好要去买这瓶水这是不是就是互斥关系了呢所以消费者与消费者之间是存在互斥关系的。 为什么生产者与消费者之间存在同步关系 生产者和消费者在操作上需要相互协调和配合以确保生产的顺利进行。例如在生产线上生产者需要等待上一道工序完成后才能进行下一道工序而消费者则需要等待生产者完成生产后才能进行消费。在这种情况下生产者和消费者需要在时间上保持同步以确保整个生产的协调和稳定。 了解了生产者消费者模型之后下面我来教大家一个方法让大家快速记住它。
321原则
3种关系生产者与生产者互斥关系、消费者与消费者互斥关系、生产者和消费者互斥关系、同步关系2种角色生产者和消费者1个交易场所特定结构的内存空间。比如说上面的超市通常指的是内存中的一段缓冲区。
基于BlockingQueue阻塞队列的生产者消费者模型
1.BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别
当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素当队列满时往队列里存放元素的操作也会被阻塞直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的线程在对阻塞队列进程操作时会被阻塞) 2. 使用CSTL中的queue来模拟实现阻塞队列 为了便于同学们理解我们以单生产者单消费者来进行讲解。
#pragma once#include iostream
#include queue
#include pthread.husing namespace std;template class T
class BlockQueue
{static const int defaultnum 4;
public:BlockQueue(int maxcap defaultnum):maxcap_(maxcap){pthread_mutex_init(mutex_,nullptr);pthread_cond_init(c_cond_,nullptr);pthread_cond_init(p_cond_,nullptr);}// 谁来唤醒呢T pop(){pthread_mutex_lock(mutex_);if(q_.size() 0){pthread_cond_wait(c_cond_,mutex_);}T out q_.front();// 你想消费就直接能消费吗不一定。你得先确保消费条件满足q_.pop();pthread_mutex_unlock(mutex_);return out;}void push(const T num){pthread_mutex_lock(mutex_);if(q_.size() maxcap_){pthread_cond_wait(p_cond_,mutex_);}//1.队列没有满 2.被唤醒q_.push(num);// 你想生产就直接能生产吗不一定。你得先确保生产条件满足pthread_mutex_unlock(mutex_);}~BlockQueue(){pthread_mutex_destroy(mutex_);pthread_cond_destroy(c_cond_);pthread_cond_destroy(p_cond_);}
private:std::queueT q_; // 共享资源int maxcap_; // 极值pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;};我们实现的是单生产者、单消费者的生产者-消费者模型所以我们主要关注生产者和消费者之间的同步与互斥关系。模型中不需要维护生产者与生产者、消费者与消费者之间的关系因为只有一个生产者和一个消费者。为了方便使用我们采用了模板化的数据存储在阻塞队列中。这个阻塞队列是临界资源因为它会被生产者和消费者同时访问。为了确保数据的一致性和正确性我们使用互斥锁来保护这个临界资源。当生产者想要向队列中Push数据时它首先需要检查队列是否还有空间。如果没有空间生产者会挂起等待直到队列中有可用空间。同样消费者在尝试从队列中Pop数据时也会先检查队列是否为空。如果为空消费者会挂起等待直到队列中有数据。因此在这里我们需要用到两个条件变量一个条件变量用来描述队列为空另一个条件变量用来描述队列已满。当阻塞队列满了的时候要进行生产的生产者线程就应该在full条件变量下进行等待当阻塞队列为空的时候要进行消费的消费者线程就应该在empty条件变量下进行等待。在这个模型中无论是生产者还是消费者它们在进入临界区之前都会先申请锁。如果条件不满足如队列满或队列空对应的线程会挂起。但此时该线程是拿着锁的为了避免死锁问题当线程调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁当该线程被挂起时它会释放手中的互斥锁。当线程被唤醒时它会重新获取这个互斥锁。这样即使在多线程环境中也能保证对临界资源的正确访问和数据的完整性。谁来唤醒谁最清楚队列里面有没有数据——生产者所以我们用生产者来唤醒消费者消费数据。谁最清楚队列里面有没有空间——消费者所以我们用消费者来唤醒生产者。 主函数
生产者是每隔一秒生产一个数据消费者每隔一秒消费一个数据
#include BlockQueue.hpp
#include unistd.hvoid* Consumer(void* args)
{BlockQueueint* bq static_castBlockQueueint*(args);//消费者不断进行生产while (true){int data bq-pop();cout 消费者消费了一个数据: data endl;sleep(1);}
}void* Productor(void* args)
{BlockQueueint* bq static_castBlockQueueint*(args);//生产者不断进行生产while (true){int data rand()%100;bq-push(data);cout 生产者生产了一个数据: data endl;sleep(1);}
}int main()
{//种一颗随机数种子srand((long long)time(nullptr));BlockQueueint* bq new BlockQueueint;//创建生产者线程和消费者线程pthread_t c,p;pthread_create(c,nullptr,Consumer,bq);pthread_create(p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete bq;return 0;
} 阻塞队列要让生产者线程向队列中Push数据让消费者线程从队列中Pop数据因此这个阻塞队列必须要让这两个线程同时看到所以我们在创建生产者线程和消费者线程时需要将该阻塞队列作为线程执行例程的参数进行传入。 生产者消费者步调一致 代码运行后我们可以看到生产者和消费者的执行步调是一致的。
生产者生产的快消费者消费的慢 运行结果 我们让生产者每隔一秒生产一个数据消费者每隔两秒消费一个数据。过了一段时间后阻塞队列被塞满了数据此时生产者要进行等待同时通知消费者来消费此时消费者消费一个数据然后生产者被唤醒进而继续生产数据。此时就会变成生产者每生成一个数据消费者消费就会消费一个数据所以后面我们就看到了生产者和消费者步调又一致了。
生产者生产的慢消费者消费的快 运行结果 我们让生产者每隔两秒生产一个数据消费者每隔一秒消费一个数据。由于生产者生产的慢此时阻塞队列里面没数据所以消费者就需要进行等待直达到有数据了才可以进行消费因此消费者它的步调会随着生产者走。
满足某一条件时再唤醒对应的生产者或消费者
我们设置了两个参数low_water_ 和 int high_water_当队列中的数据小于low_water_时消费者唤醒生产者生产数据而当队列中的数据大于high_water_时生产者唤醒消费者进行消费数据。
#pragma once#include iostream
#include queue
#include pthread.husing namespace std;template class T
class BlockQueue
{static const int defaultnum 20;
public:BlockQueue(int maxcap defaultnum):maxcap_(maxcap){pthread_mutex_init(mutex_,nullptr);pthread_cond_init(c_cond_,nullptr);pthread_cond_init(p_cond_,nullptr);low_water_ maxcap_/3;high_water_ (maxcap_*2)/3;}// 谁来唤醒呢T pop(){pthread_mutex_lock(mutex_);if(q_.size() 0){pthread_cond_wait(c_cond_,mutex_);}T out q_.front();q_.pop();if(q_.size() low_water_){pthread_cond_signal(p_cond_);}pthread_mutex_unlock(mutex_);return out;}void push(const T num){pthread_mutex_lock(mutex_);if(q_.size() maxcap_){pthread_cond_wait(p_cond_,mutex_);}//1.队列没有满 2.被唤醒q_.push(num);if(q_.size() high_water_){pthread_cond_signal(c_cond_);}pthread_mutex_unlock(mutex_);}~BlockQueue(){pthread_mutex_destroy(mutex_);pthread_cond_destroy(c_cond_);pthread_cond_destroy(p_cond_);}
private:std::queueT q_; // 共享资源//int mincap_;int maxcap_; // 极值pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;int low_water_;int high_water_;
};运行结果 可以看到我们这一次只有在队列的数据小于low_water_才会通知生产者进行生产数据只有在队列的数据大于high_water_的一半时才会通知消费者进行消费数据。
3. 基于任务的生产者消费者模型
我们的生产者消费者模型可不是只能像上面那样生产者生成一个数据消费者消费一个数据仅仅打印一个数字而已。
我们还可以自己定义一个Task的类然后实现一个基于计算任务的生产者消费者模型生产者生产任务消费者去处理这个任务然后计算出答案。
Task.hpp
#pragma once
#include iostream
#include stringusing namespace std;
string opers -*/%;enum{DivZero 1,ModZero,Unknown
};class Task
{
public:Task(int x,int y,char op):data1_(x),data2_(y),oper_(op),result_(0),exitcode_(0){ }void run(){switch (oper_){case : result_ data1_ data2_;break;case -: result_ data1_ - data2_;break;case *: result_ data1_ * data2_;break;case /: {if(data2_ 0) exitcode_ DivZero;else result_ data1_ / data2_;}break;case %: {if(data2_ 0) exitcode_ ModZero;else result_ data1_ % data2_;}break;default:exitcode_ Unknown;break;}}void operator()(){run();}string GetResult(){string r to_string(data1_);roper_;rto_string(data2_);r;rto_string(result_);r[code:;rto_string(exitcode_);r];return r;}string GetTask(){string r to_string(data1_);roper_;rto_string(data2_);r?;return r;}~Task(){}
private:int data1_;int data2_;char oper_;int result_;int exitcode_;
};
BlockQueue.hpp
#pragma once#include iostream
#include queue
#include pthread.husing namespace std;template class T
class BlockQueue
{static const int defaultnum 20;
public:BlockQueue(int maxcap defaultnum):maxcap_(maxcap){pthread_mutex_init(mutex_,nullptr);pthread_cond_init(c_cond_,nullptr);pthread_cond_init(p_cond_,nullptr);// low_water_ maxcap_/3;// high_water_ (maxcap_*2)/3;}// 谁来唤醒呢T pop(){pthread_mutex_lock(mutex_);while(q_.size() 0) // 因为判断临界资源调试是否满足也是在访问临界资源判断资源是否就绪是通过再临界资源内部判断的。{// 如果线程wait时被误唤醒了呢 pthread_cond_wait(c_cond_,mutex_); // 你是持有锁的1. 调用的时候自动释放锁因为唤醒而返回的时候重新持有锁}T out q_.front(); // 你想消费就直接能消费吗不一定。你得先确保消费条件满足q_.pop();// if(q_.size() low_water_) pthread_cond_signal(p_cond_);pthread_cond_signal(p_cond_);pthread_mutex_unlock(mutex_);return out;}void push(const T num){pthread_mutex_lock(mutex_);while(q_.size() maxcap_)// 这里用while作为判断条件做到防止线程被伪唤醒的情况{// 伪唤醒情况pthread_cond_wait(p_cond_,mutex_);//1. 调用的时候自动释放锁 2.}//1.队列没有满 2.被唤醒q_.push(num);// 你想生产就直接能生产吗不一定。你得先确保生产条件满足// if(q_.size() high_water_) pthread_cond_signal(c_cond_);pthread_cond_signal(c_cond_);pthread_mutex_unlock(mutex_);}~BlockQueue(){pthread_mutex_destroy(mutex_);pthread_cond_destroy(c_cond_);pthread_cond_destroy(p_cond_);}
private:std::queueT q_; // 共享资源, q被当做整体使用的q只有一份加锁。但是共享资源也可以被看做多份int maxcap_; // 极值pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;// int low_water_;// int high_water_;
};main.c
#include BlockQueue.hpp
#include Task.hpp
#include unistd.h
#include ctimevoid* Consumer(void* args)
{BlockQueueTask* bq static_castBlockQueueTask*(args);while (true){//消费Task t bq-pop();//计算// t.run();t();cout 处理任务 t.GetTask() 处理任务的结果是 t.GetResult() thread_id: pthread_self() endl;// sleep(1);}
}void* Productor(void* args)
{int len opers.size();BlockQueueTask* bq static_castBlockQueueTask*(args);int x 10,y 20;while (true){//模拟生产者生产数据int data1 rand()%101;// [1,10]usleep(10);int data2 rand()%10;// [0,9]char op opers[rand()%len];Task t(data1,data2,op);//生产bq-push(t);cout 生产者生产了一个任务: t.GetTask() thread id: pthread_self() endl;sleep(1);}
}int main()
{//种一颗随机数种子srand((long long)time(nullptr));BlockQueueTask* bq new BlockQueueTask;pthread_t c[3],p[5];for(int i 0;i 3;i){pthread_create(ci,nullptr,Consumer,bq);}for(int i 0;i 5;i){pthread_create(pi,nullptr,Productor,bq);}for(int i 0;i 3;i){pthread_join(c[i],nullptr);}for(int i 0;i 5;i){pthread_join(p[i],nullptr);}delete bq;return 0;
}
运行结果 可以看到如此以来我们的生产者不断的往阻塞队列里面放一个又一个的Task对象然后消费者拿到一个又一个的Task对象之后对他们进行处理从而得到运行结果。
我们从代码中可以看到生产者和消费者进行生产或者消费数据时都要进行加锁所以这个生产和消费在访问仓库的过程之中本身就是串行的这样的话生产消费还能高效吗答案是可以的这是因为生产消费不只是存数据和获取数据的过程他还有前置和后置的工作要进行处理下面我们来具体说明一下
4.生产消费过程是高效的
我们看下面这张图如果我们只看红色方框里面的内容生产者和消费者就是生产者生产数据和消费者获取数据而且生产者生产数据和消费数据是同步关系的消费者必须要等生产者生产数据之后再来消费如果我们只是这样看生产者消费者模型的话效率是低下的
但是实际上生产者消费者模型还是前置和后置的工作
生产者获取数据是需要花时间的生产者生产数据包括两个步骤1.获取数据 2.生产数据到队列那么生产者的数据从哪里来呢生产者的数据实际上是从用户或者网络等途径获取数据然后再生产数据到队列这个过程是需要花时间的消费者消费数据也是要花时间的消费者消费数据包括两个步骤1.消费数据 2.加工处理数据从上面基于任务的生产者消费者模型的过程我们可以看到消费者获取数据后还需要将数据进行计算这个过程也是要花时间的
我们来看下面这个图进行加深理解 所以虽然这个生产和消费在访问仓库的过程之中本身就是串行的生产的时候不能进行消费消费的时候不能进行生产但是由于生产和消费都是需要花时间的那么如果在生产者进行生产的时候消费者进行加工处理数据而消费者进行消费的时候生产者进行获取数据这样的话生产和消费不就是并发进行的吗一个在访问临界区的代码一个在访问非临界区的代码这样两个线程就高效并发的调度运行起来了是不是生产消费的效率就提高了 那么我们还有一个问题在我们上面基于任务的生产者消费者模型当中我们创建了多个生产者线程和多个消费者线程这样可以提高效率吗 答案是可以的从运行结果我们可以看到每次生产和消费的线程id都是不一样的。这样当其中一个生产者进行申请锁的时候其它线程也可以并发的进行获取数据其中一个消费者进行申请锁的时候其它消费者线程可以并发的进行处理数据。这样效率就大大提高了 5.伪唤醒问题
伪唤醒问题是指线程但此时条件判断还不满足但是线程却因为伪唤醒而运行后续的代码这可能导致程序运行异常或错误。下面我们来具体说明伪唤醒问题 假设阻塞队列已经被写满了消费者正在运行消费之后唤醒一个生产者然后消费者进行解锁这个时候阻塞队列只有一个位置能进行生产。由于刚开始阻塞队列是满的所以许多生产者线程要进行生产却不能进行这个时候如果消费者在唤醒的时候多次使用了pthread_cond_signal或者使用了pthread_cond_broadcast。那么在一个条件变量等待的多个生产者线程都被唤醒了。这个时候这些生产者线程就不再在条件变量下进行等待了。这样第一个拿到锁的生产者从pthread_cond_wait()开始往下执行程序进行生产数据生产完数据之后阻塞队列又满了生产者又进行唤醒消费者进行消费数据然后释放锁。但这个时候不一定是消费者拿到锁刚刚被唤醒的多个生产者也可能拿到锁。而这个时候如果是生产者拿到了锁函数并不是从头开始执行的而是继续从pthread_cond_wait()开始往下执行程序进行当由于阻塞队列已经满了这个时候生产者再向阻塞队列进行push就会产生错误这种情况的被唤醒的生产者线程就被称为伪唤醒状态。 那么如何解决伪唤醒问题呢
判断是否满足生产消费条件时不能用if而应该用while
为了避免伪唤醒问题我们在循环中检查线程等待条件也就是说当线程被唤醒时我们不能直接往后执行而是要让它重新判断一次是否满足条件确保程序在满足结束条件的情况下退出。
void push(const T num)
{pthread_mutex_lock(mutex_);while(q_.size() maxcap_)// 这里用while作为判断条件做到防止线程被伪唤醒的情况{// 伪唤醒情况pthread_cond_wait(p_cond_,mutex_);//1. 调用的时候自动释放锁 2.}//1.队列没有满 2.被唤醒q_.push(num);// 你想生产就直接能生产吗不一定。你得先确保生产条件满足// if(q_.size() high_water_) pthread_cond_signal(c_cond_);pthread_cond_signal(c_cond_);pthread_mutex_unlock(mutex_);
}
这样增加了循环检查线程等待条件当条件不满足时就会重新进入休眠状态等待唤醒。