沈阳定制网站制作,手机网站建设公司排名,科技创新可以被分成三种类型,wordpress别人主题插件目录
线程互斥
锁的初始化
加锁
解锁
锁的初始化
锁的原理
死锁
线程同步
方案一#xff1a;条件变量
条件变量初始化
等待
唤醒
条件变量的代码示例
基于阻塞队列的生产消费模型
方案二#xff1a;POSIX信号量
初始化信号量#xff1a;
销毁信号量
等待信…目录
线程互斥
锁的初始化
加锁
解锁
锁的初始化
锁的原理
死锁
线程同步
方案一条件变量
条件变量初始化
等待
唤醒
条件变量的代码示例
基于阻塞队列的生产消费模型
方案二POSIX信号量
初始化信号量
销毁信号量
等待信号量(P())
发布信号量(V())
基于环形队列的生产消费模型
线程池 下面是几个在前面的学习中提及到的相关的概念
临界资源多线程执行流共享的资源就叫做临界资源临界区每个线程内部访问临界资源的代码就叫做临界区互斥任何时刻互斥保证有且只有一个执行流进入临界区访问临界资源通常对临界资源起保护作用原子性可能会被调度机制影响但是该操作只有两态要么完成要么未完成 线程互斥
多个线程访问同一个全局变量并对它进行数据计算在并发访问的时候可能会导致数据不一致的问题例如下述的抢票的问题 有一个全局变量ticket假定刚开始有5000张票线程一执行抢票的操作在代码中执行ticket--的时候会有三步操作第一步读取数据到CPU内的寄存器中第二步在CPU内部进行计算第三步将计算的结果写回内存
当线程一刚执行完第二步操作将ticket--变为了4999准备执行第三步操作时这时线程一被切换走于是将4999保存在线程一的上下文数据中与此同时线程二开始进行抢票操作由于线程一的4999没有写回内存所以线程二所取得的ticket依旧是5000此时线程二也执行上述的三步操作当线程二执行了一段时间ticket变为了3000的时候线程二被切换走该线程一执行时线程一存储的ticket却是4999此时线程一执行操作三ticket又变为了4999这里就出现了数据不一致的问题
为了解决上述问题就需要加锁保护下面介绍如何加锁保护
pthread_mutex_t是原生线程库提供的一个数据类型pthread_mutex_t mtx可以定义一个锁
锁的初始化
对于锁需要初始化(也叫互斥量初始化)
第一种方法可以直接使用pthread_mutex_init进行初始化
第二种方法这个锁如果是全局的或是静态定义的就可以使用PTHREAD_MUTEX_INITIALIZER这样的宏进行初始化
下面先演示使用第二种方法的场景 而在getticket中这段区域能够访问临界资源是临界区需要加锁保护 加锁
加锁可以使用pthread_mutex_lock 直接将刚刚定义的锁mtx取地址然后传入pthread_mutex_lock即可
如下所示在每次循环进入临界区前加锁 每一个线程执行抢票的语句时都会执行这个代码这个锁的特点是任何一个时刻只允许一个线程成功的获得这把锁然后向后运行执行下面的代码而其他没有拿到锁的线程只能默认阻塞等待直到拿到锁的线程把锁释放掉其他线程才能进入
有加锁自然也要有解锁在临界区后需要解锁
解锁
解锁需要用到pthread_mutex_unlock 代码中的位置如下图所示 之所以在else中也要解锁是因为如果加锁后没有进入if语句而是进入了else语句这时break出去全局的锁依旧处于加锁状态其他线程就无法继续向后执行了所以这里在else中也需要有解锁语句
需要注意加锁的时候需要注意力度越小越好加锁的代码越多效率越低因为加锁后就变为了串行执行了 锁的初始化
pthread库所提供的pthread_mutex_init可以初始化一把锁如下 pthread_mutex_init需要包含头文件pthread.h
上面说到初始化有两种方式上面演示的是锁是全局的下面演示锁不是全局的情况将锁设置在main函数中这种情况就需要用到pthread_mutex_init进行初始化了
函数参数
第一个参数是定义的锁的地址
第二个参数锁的属性(一般设为nullptr即可)
返回值
成功返回0失败返回错误码
使用方式如下所示 锁mtx不是全局的所以需要用到pthread_mutex_init进行初始化在末尾还需要pthread_mutex_destroy释放锁 在学习了上面的加锁保护后思考以下几个问题
①加了锁之后线程在临界区中是否会被切换被切换会产生问题吗
当然会被切换且不会产生任何问题。虽然被切换了但是当前线程是持有锁被切换的所以其他抢票的线程也必须先申请锁才能够执行临界区的代码而锁是被当前线程持有的所以不会申请成功所以也就不会让其他线程进入临界区中这样就保证了临界区中数据的一致性
②原子性体现在哪
在没有持有锁的线程2的角度看只有两种情况对于线程2有意义第一线程1没有持有锁(什么都没做)第二线程1释放锁(已经做完)此时线程2可以申请锁
所以上述情况可以反映出线程1在持有锁期间对其他线程来说就是原子性的要不没有做要么就是已经做完
③加锁后就是串行执行了吗
对的对于临界区的代码来说一定是串行执行的
④每个线程必须先申请锁再访问临界资源所以每一个线程都必须看到同一把锁并且访问它这就说明锁本身就是共享的资源而锁保证全局数据的安全谁又来保证锁的安全呢
为了保证锁的安全申请和释放锁必须是原子性的所以原子性的申请锁这样才能够做到一个线程申请锁直到它完成整个动作释放锁后其他线程才能申请这样保证了锁的安全
在学习完下面的锁的原理后我们就可以明白是使用swap或exchange指令通过一行汇编的方式来保证自己的原子性的 锁的原理
首先在执行流视角是如何看待CPU上面的寄存器的?
CPU内部的寄存器本质叫做当前执行流的上下文。寄存器的空间是被所有的执行流共享的但是寄存器的内容是被每一个执行流私有的(即上下文)
在汇编的角度如果只有一条汇编语句我们就认为该汇编语句的执行是原子的
而为了实现互斥锁的操作大多数体系结构都提供了swap或exchange指令该指令的作用是把寄存器和内存的数据相交换由于只有一条指令所以保证了原子性
并且我们要知道交换本质就是将共享变为私有因为从内存中交换到了寄存器中也就是变为了线程自己的上下文数据所以变为私有的了
比如内存中锁的数据是1线程初始的寄存器数据是0 线程A进行lock的操作已经执行完寄存器数据和内存数据交换的代码后线程A被切换出去了但是在切换前线程A将寄存器中的1记录下来作为自己的上下文数据这时切换为了线程B执行lock操作 此时线程B也执行到了将寄存器数据和内存数据交换的代码但是此时内存中锁的数据是刚刚线程A交换后的数据也是0所以即使线程B也执行这个代码线程B的寄存器中的数据仍然是0依然无法完成lock的操作只能等待线程A执行完lock操作再执行unlock操作将锁的数据变为1后线程B才能拿到内存中锁的原始数据1进而才能执行lock操作 下面再复习几个概念
可重入概念
可重入是针对于函数来谈的一个函数被多个执行流重复进入的现象就叫做可重入在上面的抢票例子中getticket函数就是被重入了
如果在重入期间如果没有出问题这个被重入的函数就被叫做可重入函数
线程安全概念
线程执行过程中访问了某些全局的某些数据或共享的某些资源可能导致了其他线程出现数据不一致问题、崩溃问题等等就称之为线程安全问题
函数是可重入的那就一定是线程安全的
线程安全的时候函数不一定是可重入的 死锁
死锁就是多线程场景当中持有锁的线程在持有自身锁的同时还向对方申请对方的锁且不释放自己的锁进而导致代码无法向下推进的情况称之为死锁
产生死锁的四个必要条件
即产生死锁时这四个必要条件一定都被满足了
互斥条件一个资源每次只能被一个执行流使用 请求与保持条件一个执行流因请求资源而阻塞时对已获得的资源保持不放 不剥夺条件一个执行流已获得的资源在末使用完之前不能强行剥夺 循环等待条件若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
破坏死锁的四个必要条件 加锁顺序一致 避免锁未释放的场景 资源一次性分配 线程同步
有两个情况没有错误但是却不合理
①单独的某个线程频繁的申请到资源(造成别人饥饿的问题) ②太过于浪费自己与对方的资源(做无用功)
引入线程同步主要是为了解决访问临界资源合理性的问题
按照一定的顺序进行临界资源的访问就叫做线程同步的过程
线程同步也是有方案的
方案一条件变量
当我们申请临界资源前先要做临界资源是否存在的检测而做检测的本质也是访问临界资源的所以对临界资源的检测也一定是需要在加锁和解锁之间的
若是使用常规方式要检测条件就绪则注定了我们必须频繁申请和释放锁
那么如何让我们的线程检测到资源不就绪的时候呢
①不要让线程再频繁的自己检测让线程进行等待 ②当条件就绪的时候通知对应的线程让他来进行资源申请和访问
为了实现上面的效果就需要引入条件变量了
条件变量初始化
条件变量的初始化和互斥锁那里的初始化一样有两种方式
如果定义的是全局的或是静态的条件变量那就可以使用下面的PTHREAD_COND_INITIALIZER这个宏进行初始化
如果定义的是局部的条件变量那就使用pthread_cond_t来定义它用pthread_cond_init来初始化它与互斥锁一样如果是用pthread_cond_init来初始化的那就需要用pthread_cond_destroy来销毁它 pthread_cond_init第一个参数是对应的条件变量第二个参数线程属性设置为nullptr即可
同样pthread系列的函数返回值都是成功返回0失败返回错误码下面的pthread函数也是一样的就不在赘述了 等待
在临界资源中检测对应的临界资源不就绪此时第一件事就是不用频繁的工作而是进行等待而这里的等待就需要用到pthread_cond_wait pthread_cond_wait的第一个参数就是对应的条件变量第二个参数是对应的互斥锁 唤醒
发通知需要用到pthread_cond_signal 其中pthread_cond_broadcast是把所有线程全部唤醒
pthread_cond_signal是唤醒指定的一个线程 条件变量的代码示例
下面使用代码实现主线程随机唤醒四个线程的其中一个且这4个线程执行的任务是不一样的在其中一个线程执行时其他线程在等待队列中等待共执行8次观察打印的顺序
定义局部的互斥锁与条件变量而不定义全局的互斥锁与条件变量是为了更好的理解如何让每一个线程都得到同一个局部的互斥锁与条件变量方法是设置一个Data类将需要传的数据都创建在类中的成员变量然后在线程创建时在pthread_create函数的最后一个参数中传入该类对象的指针这样就可以让每一个函数都能够使用该指针从而得到局部设置的互斥锁和条件变量
如果设置为全局的就不需要给每一个函数传参比较简单
makefile
mycond:mycond.ccg -o $ $^ -stdc11 -lpthread
.PHONY:clean
clean:rm -f mycond mycond.cc代码
#include iostream
#include pthread.h
#include string
#include unistd.husing namespace std;#define TNUM 4
volatile bool quit false;//typedef重命名函数为func_t
typedef void (*func_t)(const string name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);class Data
{
public:Data(const string name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond):name_(name),func_(func),pmtx_(pmtx),pcond_(pcond){}
public:string name_;func_t func_;pthread_mutex_t* pmtx_;pthread_cond_t* pcond_;};void func1(const string name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{while(!quit){//wait一定要在加锁和解锁之间进行pthread_mutex_lock(pmtx);//pthread_cond_wait代码被执行当前线程会立即被挂起等待被唤醒pthread_cond_wait(pcond, pmtx);cout name 正在读书 endl;pthread_mutex_unlock(pmtx);}
}void func2(const string name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{while(!quit){pthread_mutex_lock(pmtx);//这里需要检测临界资源是否就绪pthread_cond_wait(pcond, pmtx);cout name 正在吃饭 endl;pthread_mutex_unlock(pmtx);}
}void func3(const string name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx);cout name 正在睡觉 endl;pthread_mutex_unlock(pmtx);}
}void func4(const string name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx);cout name 正在玩耍 endl;pthread_mutex_unlock(pmtx);}
}void* Entry(void* args)
{Data* td (Data*)args;td-func_(td-name_,td-pmtx_,td-pcond_);//每一个线程调用完成Entry函数后返回再delete掉new出来的tddelete td;return nullptr;
}int main()
{//设置互斥锁mtx和条件变量condpthread_mutex_t mtx;pthread_cond_t cond;//调用init函数初始化互斥锁和条件变量pthread_mutex_init(mtx, nullptr);pthread_cond_init(cond, nullptr);func_t func[TNUM]{func1, func2, func3, func4};pthread_t tid[TNUM];for(int i 0; i TNUM; i){//为了传入每一个函数的name都与之对应string name thread ;name to_string(i1);Data* td new Data(name, func[i], mtx, cond);pthread_create(tid i, nullptr, Entry, (void*)td);}//执行8次就停止int num 8;//特定的条件变量下去唤醒while(num){cout 线程正在执行倒计时: num-- endl;pthread_cond_signal(cond); sleep(1);}//走到这表示执行完5次了quit设置为true//此时func1234函数不再进入循环cout 执行结束 endl;quit true;//最后再唤醒一次整个程序结束//因为在func函数中wait函数执行完才解锁pthread_cond_broadcast(cond); //线程等待for(int i 0; i TNUM; i){pthread_join(tid[i],nullptr);cout thread tid[i] 已经退出 endl;}//调用init就必须要调用destroypthread_mutex_destroy(mtx);pthread_cond_destroy(cond);return 0;
}线程是按照顺序被唤醒的因为线程在条件变量不满足时所有的线程都会在该条件变量下排队等待所以主线程就会在排队等待的队列中一个一个唤醒线程执行完后继续进入等待队列中wait所以每次都是同一个顺序被唤醒
由于设置了四个线程分别执行func1234函数所以我们在main运行8次观察打印结果 可以发现此次的随机顺序是按2134循环两次
最后执行结束后再全部唤醒一次然后主线程pthread_join等待成功打印已经退出 基于阻塞队列的生产消费模型
关于生产者消费者模型有以下的321原则
3种关系生产者和生产者(竞争/互斥)消费者和消费者(竞争/互斥)生产者和消费者(互斥/同步) 2种角色生产者和消费者 1个交易场所超市
交易场所本质是一个商品的缓冲区是为了提高效率其实是解耦
上述的生产者和消费者是由线程承担的(给线程角色化)
交易场所是某种数据结构表示的缓冲区
而商品则是数据
我们知道在条件满足的时候必须需要唤醒指定的线程而这里是怎么知道条件是否满足呢
其实是生产者消费者自己清楚知道条件是否满足例如超市里是否新增货物肯定生产者最清楚而超市中还剩余多少空间供生产者生产肯定消费者最清楚
所以当生产者生产了商品就表示这里的数据可以被写可以别读取了所以生产者就可以立即通知消费者
同样消费者把数据一拿走消费者知道空间又有了就可以通知生产者继续生产了
因此我们就可以让生产者消费者线程互相同步从而完成生产者消费者模型 阻塞队列的概念
阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
与我们之前学的队列不一样的是当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素 当队列满时往队列里存放元素的操作也会被阻塞直到有元素被从队列中取出
以上的操作都是基于不同的线程来说的线程在对阻塞队列进程操作时会被阻塞
下面通过代码具体演示阻塞队列用法
其中的lockGuard.hpp是关于锁的封装是RAII风格的加锁方式Task.hpp是一个新的任务类里面有自己的元素用于创建阻塞队列的对象类型
之所以创建lockGuard.hpp进行封装锁是为了美化代码在代码中不再需要显示的加锁解锁只需要创建一个lockGuard的对象该对象自动调用构造函数加锁生命周期结束也会自动调用析构函数解锁 makefile
cp:ConProd.ccg -o $ $^ -stdc11 -lpthread
.PHONY:clean
clean:rm -f cp BlockQueue.hpp
#pragma once#include lockGuard.hpp#include iostream
#include pthread.h
#include mutex
#include queueusing namespace std;const int gDefaultCap 5;templateclass T
class BlockQueue
{
private://判空bool isQueueEmpty(){return bq_.size() 0;}//判满bool isQueueFull(){return bq_.size() capacity_;}public:BlockQueue(int capacity gDefaultCap) : capacity_(capacity){//初始化mtx_、isEmpty_、isFull_pthread_mutex_init(mtx_, nullptr);pthread_cond_init(Empty_, nullptr);pthread_cond_init(Full_, nullptr);}void push(const T in) // 生产者{// pthread_mutex_lock(mtx_);// //1. 先检测当前的临界资源是否能够满足访问条件// // pthread_cond_wait: 是在临界区中并且是持有锁的如果我去等待了锁该怎么办呢// // pthread_cond_wait第二个参数是一个锁当成功调用wait之后传入的锁会被自动释放// // 从哪里阻塞挂起就从哪里唤醒, 被唤醒的时候还是在临界区被唤醒的// // 当我们被唤醒的时候pthread_cond_wait会自动帮助我们线程获取锁// // pthread_cond_wait: 但是只要是一个函数就可能调用失败// // pthread_cond_wait: 可能存在 伪唤醒 的情况// while(isQueueFull()) pthread_cond_wait(Full_, mtx_);// //2. 访问临界资源100%确定资源是就绪的// bq_.push(in);// // if(bq_.size() capacity_/2) pthread_cond_signal(Empty_);// pthread_cond_signal(Empty_);// pthread_mutex_unlock(mtx_);//下面是使用我们自己封装的锁的使用是在lockGuard.hpp中封装的//和上面是等价的lockGuard lockgrard(mtx_); // 自动调用lockGuard构造函数while (isQueueFull())pthread_cond_wait(Full_, mtx_);// 2. 访问临界资源100%确定资源是就绪的bq_.push(in);pthread_cond_signal(Empty_);} // 自动调用lockgrard 析构函数void pop(T *out){//下面这行代码替代了加锁解锁的代码因为会自动调用lockGuard lockguard(mtx_);// pthread_mutex_lock(mtx_);while (isQueueEmpty())pthread_cond_wait(Empty_, mtx_);*out bq_.front();bq_.pop();pthread_cond_signal(Full_);// pthread_mutex_unlock(mtx_);}~BlockQueue(){//调用pthread系列的init都需要调用destroy进行销毁pthread_mutex_destroy(mtx_);pthread_cond_destroy(Empty_);pthread_cond_destroy(Full_);}
private:queueT bq_; //阻塞队列int capacity_;//容量上限pthread_mutex_t mtx_; //通过互斥锁保证队列的安全pthread_cond_t Empty_;//用isEmpty来表示bq是否为空的条件pthread_cond_t Full_; //用isFull来表示bq是否为满的条件
};lockGuard.hpp
#pragma once#include iostream
#include pthread.husing namespace std;class Mutex
{
public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}void lock() {cout 正在进行加锁 endl;pthread_mutex_lock(pmtx_);}void unlock(){cout 正在进行解锁 endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}
private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}
private:Mutex mtx_;
}; ConProd.cc
#include BlockQueue.hpp
#include Task.hpp#include pthread.h
#include unistd.h
#include ctimeusing namespace std;//myAdd函数即是Task.hpp中的func_t函数
int myAdd(int x, int y)
{return x y;
}void* consumer(void *args)
{BlockQueueTask *bqueue (BlockQueueTask *)args;while(true){// 获取任务Task t;bqueue-pop(t);// 完成任务cout pthread_self() consumer: t.x_ t.y_ t() endl;}return nullptr;
}void* productor(void *args)
{BlockQueueTask *bqueue (BlockQueueTask *)args;// int// int a 1;while(true){// 制作任务int x rand()%10 1;//防止x和y创建的间隔太近而随机的数一样usleep会usleep(rand()%1000);int y rand()%5 1;Task t(x, y, myAdd);// 生产任务bqueue-push(t);// 输出消息cout pthread_self() productor: t.x_ t.y_ ? endl;sleep(1);}return nullptr;
}int main()
{//获取随机数srand((uint64_t)time(nullptr) ^ getpid() ^ 0x12345);BlockQueueTask *bqueue new BlockQueueTask();//制造两个生产者两个消费者pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p 1, nullptr, productor, bqueue);//等待pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
} Task.hpp
#pragma once#include iostream
#include functional//使用c11中学习的function
//该函数的返回值是int参数是int,int
typedef std::functionint(int, int) func_t;class Task
{
public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}//运算符重载//仿函数int operator ()(){return func_(x_, y_);}
public:int x_;int y_;func_t func_;
}; 运行结果为 可以看到代码中加锁解锁的动作
生产者productor加锁构建任务消费者consumer解锁获取完成任务 方案二POSIX信号量
信号量这个概念在前面也提到过。POSIX信号量和SystemV信号量作用相同都是用于同步操作达到无冲突的访问共享资源目的。 不同的是POSIX可以用于线程间同步。
信号量的本质其实是一个计数器
信号量初始化成多少也就代表有多少资源信号量是对于临界资源的一种预定机制只要你申请成功就一定会获得一个共享资源
访问临界资源的时候必须先申请信号量资源(sem--预订资源P操作)使用完毕信号量资源(sem释放资源V操作)
信号量的类型是sem_t和互斥锁、条件变量一样信号量也可以定义全局的或局部的定义以后就可以对信号量进行初始化了
下面先了解信号量的相关接口
初始化信号量
初始化信号量需要用到sem_init函数 需要包含头文件semaphore.h
函数参数
第一个参数信号量对象
第二个参数若为0表示线程间共享非0则表示进程间共享
第三个参数信号量的初始值 销毁信号量
同样使用init初始化就需要使用destroy进行销毁 需要包含头文件semaphore.h
函数参数就是信号量对象 等待信号量(P()) 同样需要包含头文件semaphore.h
函数参数就是信号量对象
等待信号量是P操作会将信号量的值-1 发布信号量(V()) 同样需要包含头文件semaphore.h
函数参数就是信号量对象
发布信号量是V操作会将信号量的值1表示资源使用完毕可以归还资源了 基于环形队列的生产消费模型
上面写的是基于阻塞队列的生产者消费者模型的代码其空间可以动态分配现在基于固定大小的环形队列重写这个生产者消费者模型的代码该生产者消费者模型的代码中就不再全部使用互斥锁控制了而是使用信号量完成其要求
而之所以可以直接使用信号量而不需要互斥锁的原因如下
在之前我们申请锁然后判断与访问临界资源最后释放锁本质是因为我们并不清楚临界资源的情况所以在申请到锁后需要判断是否为空或为满 而信号量的本质是计数器计数器就可以让我们不用进入临界区就可以得知临界资源的情况并且信号量是资源的预定机制表示的就是空间的情况只要能申请成功就一定能够访问所以信号量甚至可以减少临界区内部的判断是否为空或为满语句所以可以在外部就知晓临界资源的情况因此可以使用信号量而不使用锁
而环形结构的特点就是当执行到最后一个下标的位置时再又会回到第一个下标的位置
今天我们使用普通的线性数组实现环形结构只需要用模运算模拟即可
如0~n-1一共n个元素当执行到n及后面的下标时时只需%n即可所以只需要做到[数组下标] % n就可以实现这里的环形结构
在环形结构中有生产者线程和消费者线程如果生产和消费指向了环形结构的同一个位置(就表示为空or为满)此时生产和消费要有互斥或者同步问题
而大部分情况下生产和消费都指向的是不同的位置所以就有下面的想法
当生产和消费指向同一个位置时让他们具有互斥同步关系 而当生产和消费不指向同一个位置时想让他们并发执行
所以在整个过程中有以下期望
生产者不能将消费者套圈否则会出现数据覆盖的情况 消费者不能超过生产者因为超过就没有资源进行消费了
为空时一定要让生产者先运行 为满时一定要让消费者先运行
若是存在其他情况并发访问即可
生产者最关注的是空间资源- spaceSem- 初始值为n 消费者最关注的是数据资源- dataSem - 初始值为0
生产: P(spaceSem) - spaceSem--在特定位置生产V(dataSem) - dataSem 消费: P(dataSem) - dataSem--消费特定的数据V(spaceSem) - spaceSem
多生产多消费的意义
将数据或者任务生产前和拿到之后处理才是最耗费时间的多生产多消费虽然同一时间只能有一个生产者或消费者进入临界区中但是在任务生产前和拿到之后却是可以并发执行的这才是多生产多消费的意义即那任务放任务串行拿完了放完了就是并发的 下面通过代码具体演示阻塞队列的用法
sem.hpp是实现了信号量的封装
makefile:
cp:testMain.ccg -o $ $^ -stdc11 -lpthread
.PHONY:clean
clean:rm -f cp ringQueue.hpp:
#ifndef _RingQueue_HPP_ //防止重复定义
#define _RingQueue_HPP_#include iostream
#include vector
#include ctime
#include stdlib.h
#include unistd.h
#include sys/types.h
#include sem.hppconst int g_default_num 5; using namespace std;templateclass T
class RingQueue
{
public:RingQueue(int default_num g_default_num):rq_(default_num)//rq_的size也初始化为default_num,num_(default_num),p_step_(0),c_step_(0),space_sem_(default_num),data_sem_(0){//初始化锁pthread_mutex_init(pmtx, nullptr);pthread_mutex_init(cmtx, nullptr);}//生产者执行:生产者们的临界资源是下标c_step_void push(const T in){ space_sem_.p();//先申请信号量再申请锁//只有一个生产者线程进入pthread_mutex_lock(pmtx);//将rq_的c_step_下标对应的值改为inrq_[c_step_] in;//模运算实现环形队列的性质c_step_ % num_;pthread_mutex_unlock(pmtx);data_sem_.v();}//消费者执行:消费者们的临界资源是下标p_step_void pop(T* out){data_sem_.p();//先申请信号量再申请锁//只有一个消费者线程进入pthread_mutex_lock(cmtx);*out rq_[p_step_];//模运算实现环形队列的性质p_step_ % num_;pthread_mutex_unlock(cmtx);space_sem_.v();}~RingQueue(){//销毁锁pthread_mutex_destroy(pmtx);pthread_mutex_destroy(cmtx);}
private:vectorT rq_;int num_;int p_step_;//消费者下标int c_step_;//生产者下标Sem space_sem_;//空间信号量Sem data_sem_; //资源信号量pthread_mutex_t pmtx;//生产者和生产者之间的锁pthread_mutex_t cmtx;//消费者和消费者之间的锁
};#endif testMain.cc:
#include ringQueue.hpp//生产者
void* productor(void* args)
{RingQueueint* rq (RingQueueint*)args;while(true){sleep(1);//构建数据或任务对象//x是随机生成的1~100之间的整数int x rand()%100 1;//push推送到环形队列中rq-push(x);cout 线程[ pthread_self() ] 生产: x endl;}
}//消费者
void* consumer(void* args)
{RingQueueint* rq (RingQueueint*)args;while(true){sleep(1);int x;//从环形队列中获取任务或数据rq-pop(x);//进行一定的处理cout 线程[ pthread_self() ] 消费: x endl;}
}int main()
{//生成随机数种子srand((uint64_t)time(nullptr) ^ getpid());pthread_t c[3],p[3];RingQueueint* rq new RingQueueint();//创建线程for(int i 0; i 3; i)pthread_create(c i, nullptr, productor, rq);for(int i 0; i 3; i)pthread_create(p i, nullptr, consumer, rq);//等待线程for(int i 0; i 3; i)pthread_join(c[i], nullptr);for(int i 0; i 3; i)pthread_join(p[i], nullptr);return 0;
} sem.hpp:
#ifndef _SEM_HPP_ //防止重复定义
#define _SEM_HPP_#include semaphore.hclass Sem
{
public://信号量初始化需要给一个初始值valueSem(int value){//初始化信号量sem_init(sem_, 0, value);}//P操作void p(){//sem_wait等待信号量即为P操作sem_wait(sem_);}//V操作void v(){//sem_post等待信号量即为V操作sem_post(sem_);}~Sem(){//销毁信号量sem_destroy(sem_);}
private:sem_t sem_;//信号量
};#endif
在main函数中创建了多生产者多消费者运行结果为 线程池
线程池是一种线程使用模式可以对空间预先申请而空间的预先申请可以减少系统调用的次数提高使用内存的效率它的本质就是用空间换时间的策略
下面是代码简易的实现一个线程池
makefile
mythreadpool:testMain.ccg -o $ $^ -stdc11 -lpthread -DDEBUG_SHOW -D选项可以在命令行中定义宏
.PHONY:clean
clean:rm -f mythreadpool thread.hpp
#pragma once#include iostream
#include unistd.h
#include vector
#include pthread.h
#include queue
#include string
#include cstdiotypedef void*(*func_t)(void*); //设置ThreadData是为了未来创建线程的时候把名字也传进去
class ThreadData
{
public:void* args_;std::string name_;
};class Thread
{
public:Thread(int num, func_t callback, void* args):func_(callback){//构造函数中初始化name_char namebuffer[64];snprintf(namebuffer, sizeof namebuffer, thread-%d, num);name_ namebuffer;tdata_.args_ args;tdata_.name_ name_;}//创建线程void start(){pthread_create(tid_, nullptr, func_, (void*)tdata_);}//线程等待void join(){pthread_join(tid_, nullptr);}std::string name(){return name_;}~Thread(){}
private:std::string name_;func_t func_;ThreadData tdata_;pthread_t tid_;
}; threadPool.hpp
#pragma once#include thread.hpp
#include lockGuard.hpp
#include log.hppconst int g_thread_num_ 3;template class T
class ThreadPool
{
public:// 下面几个函数用于静态函数routine和外部线程取ThreadPool内的成员变量的接口pthread_mutex_t *getMutex(){return lock;}bool isEmpty(){return task_queue_.empty();}// 在条件变量下等 void waitCond(){pthread_cond_wait(cond, lock);}T getTask(){T t task_queue_.front();task_queue_.pop();return t;}public:ThreadPool(int g_num g_thread_num_) : num_(g_num){// 初始化互斥锁和条件变量pthread_mutex_init(lock, nullptr);pthread_cond_init(cond, nullptr);for (int i 1; i num_; i){threads_.push_back(new Thread(i, routine, this));}}// 使用static是因为在上面typedef了func_t函数类型只有一个参数void*// 如果不加static就会在类内定义类内的函数会多一个参数即this指针就会出现类型不匹配的错误了// 并且routine是消费者需要执行的但是消费者需要执行的任务都在task_queue_中静态函数只能访问静态方法// 所以处理方法就是在ThreadPool的构造函数中Thread的第三个参数传入this指针// 这样ThreadPool的内容就会赋值给ThreadData中的args_// 就可以使用routine函数的args强转的指针类型参数访问ThreadPool里的成员函数static void *routine(void *args){ThreadData *td (ThreadData *)args;ThreadPoolT *tp (ThreadPoolT *)td-args_;while (true){T task;// 定义一个代码块{}在这个代码块里lockguard自动加锁出代码块自动解锁即为安全的代码块// 在这个{}中即为加锁的区间临界区代码{lockGuard lockguard(tp-getMutex());while (tp-isEmpty())tp-waitCond();// 走到这里说明不为空可以拿任务了task tp-getTask();}// 处理任务在Task中有()的运算符重载(仿函数)所以直接task()处理即可task(td-name_);}}void run(){for (auto it : threads_){it-start();// std::cout it-name() 启动成功 std::endl;logMessage(NORMAL, %s%s, it-name().c_str(), 启动成功);}}// push任务需要加锁解锁生产void pushTask(const T task){// lockguard自动调用构造函数加锁释放自动调用析构函数解锁lockGuard lockguard(lock);task_queue_.push(task);pthread_cond_signal(cond);}~ThreadPool(){for (auto it : threads_){it-join();delete it;}// 销毁互斥锁和条件变量pthread_mutex_destroy(lock);pthread_cond_destroy(cond);}private:std::vectorThread * threads_;int num_;std::queueT task_queue_;pthread_mutex_t lock;pthread_cond_t cond;
}; Task.hpp任务进行封装
#pragma once#include log.hpp
#include iostream
#include functional
#include stringusing namespace std;typedef functionint(int,int) fun_t;class Task
{
public:Task(){}Task(int x,int y,fun_t func):x_(x),y_(y),func_(func){}//仿函数void operator()(const string name){// cout 线程 name 处理完成结果为: x_ y_ func_(x_, y_) endl;//__FILE__, __LINE__是预处理符可以看到哪个文件哪一行在打印logMessage(WARNING, %s处理完成%d%d%d | %s | %d,name.c_str(), x_, y_, func_(x_,y_), __FILE__, __LINE__);}
public:int x_;int y_;fun_t func_;
}; lockGuard.hpp锁的封装
#pragma once#include iostream
#include pthread.h
using namespace std;class Mutex
{
public:Mutex(pthread_mutex_t *pmtx) : pmtx_(pmtx){}void lock(){pthread_mutex_lock(pmtx_);}void unlock(){pthread_mutex_unlock(pmtx_);}~Mutex(){}private:pthread_mutex_t *pmtx_;
};class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}private:Mutex mtx_;
};log.hpp日志的打印如果想打印到文件中也可以改变下面的代码使用例如vfprintf的函数
#pragma once#include iostream
#include cstdio
#include cstdarg
#include string
#include ctime//日志等级
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4//日志等级的映射表
const char* gLevelMap[]{DEBUG,NORMAL,WARNING,ERROR,FATAL
};//日志功能至少日志等级 时间支持用户自定义(日志内容等等)
//format是输出格式例如%s之类的
void logMessage(int level, const char* format, ...)
{
//条件编译如果定义了DEBUG_SHOW这个宏在打印时就正常打印
//如果没有定义这个DEBUG_SHOW宏所以level是DEBUG的语句就不再打印了
//我们可以在makefile命令行中定义宏加上-D选项即可
#ifndef DEBUG_SHOWif(level DEBUG) return;
#endif// vprintf/vfprintf/vsprintf/ vsnprintf//是把传入的参数按照可变的方式分别进行显示到 显示器 / 文件 / 字符串 /指定长度的字符串//stdBuffer是日志的标准部分如日志等级 时间//logBuffer是日志的自定义部分如日志内容等char stdBuffer[1024];char logBuffer[1024];//这里的时间采用较为简单的时间戳表示time_t tm time(nullptr);snprintf(stdBuffer, sizeof stdBuffer, [%s] [%ld], gLevelMap[level], tm);va_list args;va_start(args, format);vsnprintf(logBuffer,sizeof logBuffer, format, args);va_end(args);//拼接两个字符串的内容一块打印出来printf(%s %s\n, stdBuffer, logBuffer);
} testMain.cc
#include threadPool.hpp
#include Task.hpp
#include ctime
#include cstdlib
#include unistd.hint main()
{srand((unsigned long)time(nullptr) ^ getpid());ThreadPoolTask* tp new ThreadPoolTask();tp-run();while(true){//制作任务int x rand()%100 1;usleep(1000);int y rand()%50 1;//lambda表达式的使用Task t(x,y,[](int x, int y)-int{return x y;});// cout 任务制作完成 x y ? endl;logMessage(DEBUG, %s:%d%d?, 任务制作完成, x, y);//推送任务到线程池tp-pushTask(t);sleep(1);}return 0;
}运行结果如下所示 如上所示的代码就可以很好地完成线程池的功能了