模拟炒股网站开发,php网站标题修改,python做的网站有哪些,装修网站模板下载线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念#xff1a;
同步#xff1a;在保证数据安全的条件下#xff0c;让线程按某种特定的顺序依次访问临界资源。
通过上一节的代码我们实现了一个多线程抢票的程序#xff0c;但结果显示…线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念
同步在保证数据安全的条件下让线程按某种特定的顺序依次访问临界资源。
通过上一节的代码我们实现了一个多线程抢票的程序但结果显示的是一个线程在疯狂的抢票这就导致了其他线程抢占不到临界资源而导致的饥饿问题。
所以在抢代码的代码逻辑中我们需要保证各个线程以同步的方式进行抢票。那如何实现呢就需要用到信号量
条件变量的接口函数 条件变量的使用
#include iostream
#include string
#include unistd.h
#include pthread.hint tickets 1000;
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond PTHREAD_COND_INITIALIZER;void *start_routine(void *args)
{std::string name static_castconst char *(args);while (true){pthread_mutex_lock(mutex);pthread_cond_wait(cond, mutex); // 为什么要有mutex后面就说// 判断暂时省略std::cout name - tickets std::endl;tickets--;pthread_mutex_unlock(mutex);}
}int main()
{// 通过条件变量控制线程的执行pthread_t t[5];for (int i 0; i 5; i){char *name new char[64];snprintf(name, 64, thread %d, i 1);pthread_create(ti, nullptr, start_routine, name);}while (true){sleep(1);// pthread_cond_signal(cond);pthread_cond_broadcast(cond);std::cout main thread wakeup one thread... std::endl;}for (int i 0; i 5; i){pthread_join(t[i], nullptr);}return 0;
}
条件变量的理解
我们用一个例子来说明
没有条件变量就好比是面试官在一个办公室面试只要应聘者得到门外挂着的钥匙进来就可以面试。这时一个应聘者抢到了钥匙进入了办公室面试结束后想要将钥匙归还时感觉自己面的不好这时自己离钥匙也是最近的所以刚出门又拿起了钥匙继续面试。这钥匙就导致了其他应聘者拿不到而这个应聘者在疯狂的面试很明显这不合理。
但是有了条件变量就好比是面试官在外面设置了一个等待区面试完的人必须归还钥匙并且再次回到等待队列的最末尾重新排队这就保证了各个应聘者都有机会面试。 条件不满足的时候线程必须去某些定义好的条件变量上进行等待。
pthread_cond_wait第二个参数的意义将锁传进去:该函数调用的时候会以原子性的方式将锁释放并将线程挂起等待。用pthread_cond_sign将线程唤醒时该线程会拿着你所传入的锁将代码继续向下运行。通过以上知识的铺垫让我们进入今天的正题
生产者消费者模型
什么是生产者消费者模型呢我们用一个形象的图来说明 生产者消费者模型就是生产者可以将生产的数据存放在一个共享区消费者从共享区中获得数据进行消费。由于是共享区就要保证数据的安全性。当生产者生产一个数据时另一个生产者不能在同一个数据上生产不然会导致数据的不安全性。因此生产者和生产者之间是互斥关系的。同样消费者也不能同时消费同一个数据不然可能会导致数据的不一致性因此消费者和消费者之间是互斥关系的。如果这时一个生产者正在生产一个数据而同时消费者也正在消费这个数据。就可能导致数据还没生产完全就已经被消费了也会导致数据的不安全性。因此生产者和消费者之间是互斥关系同时我们还希望生产者生产以后就有消费者来消费消费者消费完一个就有生产者来生产因此生产者和消费者之间是同步关系。
总结一下就是3种关系、两种角色、一个交易场所 这一段特殊的缓冲区可以提前存放一批数据这样消费者想消费的时候消费生产者想什么时候生产就什么时候生产。解决了生产和消费两批线程忙闲不均的问题是它们不具有强耦合的关系。
基于blockqueue的生产和消费模型 基于普通方式处理数据
代码实现
//Main.cc#include iostream
#include unistd.h
#include time.h
#include BlockQueue.hppusing namespace std;void* consumer(void* args)
{BlockQueueint* bq static_castBlockQueueint*(args);while(true){int val 0;bq-pop(val);cout我是消费者我消费了一个数字valendl;sleep(1);}return nullptr;
}void* productor(void* args)
{BlockQueueint* bq static_castBlockQueueint*(args);while(true){int val rand()%10;bq-push(val);cout我是生产者,我生产了一个数字valendl;//sleep(1);}return nullptr;
}int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueueint* queue new BlockQueueint(5);pthread_t c,p;pthread_create(c,nullptr,consumer,(void*)queue);pthread_create(p,nullptr,productor,(void*)queue);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}//BlockQueue.hpp
#include queue
#include pthread.h
#include iostream
using namespace std;const int NUM 5;
templateclass T
class BlockQueue
{
public:BlockQueue(const int numsize NUM):_maxsize(NUM){pthread_mutex_init(_lock,nullptr);pthread_cond_init(_ccond,nullptr);pthread_cond_init(_ccond,nullptr);}void push(const T in){pthread_mutex_lock(_lock);while(is_Full()){//这里说明阻塞队列是满的需要让生产者等待pthread_cond_wait(_pcond,_lock);}//这里说明阻塞队列至少有一个空位可以插入_queue.push(in);//唤醒消费者去消费pthread_cond_signal(_ccond);pthread_mutex_unlock(_lock);}void pop(T* out){pthread_mutex_lock(_lock);while(is_Empty()){//这里说明阻塞队列是空的需要让消费者等待pthread_cond_wait(_ccond,_lock);}//这里说明阻塞队列至少有一个数据*out_queue.front();_queue.pop();//唤醒生产者生产数据pthread_cond_signal(_pcond);pthread_mutex_unlock(_lock);}~BlockQueue(){pthread_mutex_destroy(_lock);pthread_cond_destroy(_ccond);pthread_cond_destroy(_pcond);}private:bool is_Full(){return _queue.size()_maxsize;}bool is_Empty(){return _queue.empty();}private:queueT _queue;int _maxsize;pthread_mutex_t _lock; //保护临界资源的锁pthread_cond_t _ccond; //消费者的条件变量pthread_cond_t _pcond; //生产者的条件变量
};
代码细节 基于计算器任务的Task
我们得创建一个task.hpp,里面定义一个CalTask类
class CalTask
{
public:using func_t std::functionint(int,int,char);CalTask(){}CalTask(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func){}std::string operator()(){int result _callback(_x,_y,_op);char buffer[64];snprintf(buffer,sizeof buffer,%d %c %d %d,_x,_op,_y,result);return buffer;}std::string to_string(){char buffer[64];snprintf(buffer,sizeof buffer,%d %c %d ?,_x,_op,_y);return buffer;}~CalTask(){}private:int _x;int _y;char _op;func_t _callback;};
生产者任务
void* productor(void* args)
{BlockQueueCalTask* bq static_castBlockQueueCalTask*(args);while(true){int x rand()%10;int y rand()%10;char op op_str[rand()%op_str.size()];CalTask task(x,y,op,mymath);bq-push(task);coutproductor task:task.to_string()endl;//sleep(1);}return nullptr;
}
消费者任务
void* consumer(void* args)
{BlockQueueCalTask* bq static_castBlockQueueCalTask*(args);while(true){CalTask t;bq-pop(t);string result t();coutconsumer result:resultendl;sleep(1);}return nullptr;
} 基于两个阻塞队列实现计算与存储
将计算和存储的两个队列放进同一个类中
templateclass c,class s
class BlockQueues
{public:BlockQueuec* c_bq; BlockQueues* s_bq;
};
main函数
int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueuesCalTask,SaveTask bqs;bqs.c_bq new BlockQueueCalTask;bqs.s_bq new BlockQueueSaveTask;//BlockQueueCalTask* bqs new BlockQueueCalTask(5);pthread_t c,p,s;pthread_create(c,nullptr,consumer,(void*)bqs);pthread_create(p,nullptr,productor,(void*)bqs);pthread_create(s,nullptr,saver,(void*)bqs);pthread_join(c,nullptr);pthread_join(p,nullptr);pthread_join(s,nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0;
}
存储任务
class SaveTask
{typedef std::functionvoid(const std::string) func_t;
public:SaveTask(){}SaveTask(const std::string message, func_t func): _message(message), _func(func){}void operator()(){_func(_message);}
private:std::string _message;func_t _func;
};void Save(const std::string message)
{const std::string target ./log.txt;FILE *fp fopen(target.c_str(), a);if(!fp){std::cerr fopen error std::endl;return;}fputs(message.c_str(), fp);fputs(\n, fp);fclose(fp);
}
存储线程执行的任务
void *saver(void *bqs_)
{BlockQueueSaveTask *save_bq (static_castBlockQueuesCalTask, SaveTask *(bqs_))-s_bq;while(true){SaveTask t;save_bq-pop(t);t();std::cout save thread,保存任务完成... std::endl; }return nullptr;
}
代码的执行结果 生产者与消费者模型优势总结
这个模型的优势不在于多个线程能够并发式的对共享资源里面的数据进行访问和处理而是多个线程能够在加载任务和处理任务的时候进行并发处理。 在我们上面的代码逻辑中很简单就是随机构造x和y构成一个任务最后放入阻塞队列中。而在实际情况中加载任务的时候没有那么简单有时需要从网络或者数据库中加载这就需要消耗很长的一段时间。这个模型的优势就是在于多个生产者能同时加载多个任务随后竞争出一名生产者将任务放入共享资源阻塞队列中然后在竞争出一名消费者取出任务。因此模型的优势不在于多个线程能并发的从阻塞队列中拿数据处理而是在加载任务的时候做到并发节省时间。同理处理任务当一个线程处理任务的同时另一个线程仍可以从阻塞队列取出任务处理不影响之前的进程处理任务因此消费者处理任务的环节也做到了并发实现了加载任务和处理任务的解耦提高了整个程序的运行效率和速度。
到这里本章的内容就全部结束了创作不易希望大家多多点赞支持。