名词解释 网站内容,学习网站开发教程,优化百度搜索,韶关网站seoLinux系统的多线程 1. Linux线程概念1.1 什么是线程1.2 页表的概念1.2.1 一级页表的缺点1.2.2 二级页表 1.3 线程的优缺点1.4 线程异常1.5 线程用途1.6 Linux进程VS线程 2. Linux线程控制2.1 创建线程2.2 线程ID及地址空间布局2.3 线程终止2.3.1 线程函数处进行return2.3.2 使用… Linux系统的多线程 1. Linux线程概念1.1 什么是线程1.2 页表的概念1.2.1 一级页表的缺点1.2.2 二级页表 1.3 线程的优缺点1.4 线程异常1.5 线程用途1.6 Linux进程VS线程 2. Linux线程控制2.1 创建线程2.2 线程ID及地址空间布局2.3 线程终止2.3.1 线程函数处进行return2.3.2 使用pthread_exit函数2.3.3 pthread_cancel函数 2.4 线程等待2.5 线程分离 3. Linux线程互斥3.1 进程线程间的互斥相关背景概念3.2 互斥量mutex3.3 代码演示3.4 线程不安全的原因 4. 加锁4.1 互斥量的接口4.1.1 初始化互斥量 4.1.2 销毁互斥量4.1.3 互斥量加锁和解锁 4.2 改进代码4.3 锁的本质4.4 对锁进行封装 5. 可重入VS线程安全5.1 重入概念5.2 线程安全 6. 死锁6.1 死锁的概念6.2 死锁的必要条件 7. Linux线程同步7.1 同步概念与竞态条件7.2 条件变量7.3 条件变量相关接口7.3.1 初始化和销毁条件变量7.3.2 阻塞等待条件函数7.3.3 唤醒阻塞等待的条件变量函数 7.4 为什么 pthread_cond_wait 需要互斥锁? 8. 生产者消费者模型8.1 为何要使用生产者消费者模型8.2 生产者消费者模型优点8.3 基于阻塞队列的生产者消费者模型8.4 C模拟阻塞队列的生产消费模型8.5 POSIX信号量8.5.1 什么是POSIX信号量8.5.2 POSIX信号量实现原理8.5.3 POSIX信号量接口函数 8.6 基于环形队列的生产消费模型8.6.1 环形队列8.6.2 环形队列的实现 9. 线程池9.1 基本概念9.2 线程池工作的四种情况9.2.1 主程序当前没有任务要执行线程池中任务队列为空闲状态9.2.2 主程序添加小于等于线程池中线程数量得任务9.2.3 主程序添加任务数量大于当前线程池中线程数量的任务9.2.4 主程序添加任务数量大于当前线程池中线程数量的任务且任务缓冲队列已满 9.3 线程池的实现9.4 对线程进行简单的封装 10. 线程安全的单例模式10.1 单例模式的特点10.2 饿汉实现方式和懒汉实现方式10.3 懒汉方式实现单例模式将线程池该成单例 11. STL、智能指针的线程安全12. 其他常见的各种锁13. 读者写者问题13.1 读者与写者的关系13.2 读写锁的API函数13.3 伪代码理解读写锁的原理13.4 读写锁的演示 1. Linux线程概念
1.1 什么是线程
在一个程序里的一个执行路线就叫做线程thread。更准确的定义是线程是“一个进程内部的控制序列”。一切进程至少都有一个执行线程。线程在进程内部运行本质是在进程地址空间内运行。在Linux系统中在CPU眼中看到的PCB都要比传统的进程更加轻量化。透过进程虚拟地址空间可以看到进程的大部分资源将进程资源合理分配给每个执行流就形成了线程执行流。 前置知识进程的概念和如下的页表概念
1.2 页表的概念
1.2.1 一级页表的缺点 在32位平台下有232个地址也就是一张页表就要有232个映射关系。 每张表的内容出了映射关系外还包含了一些权限相关信息。比如页表分为内核级页表和用户级页表通过权限信息来区分。 每个表项中存储了一个物理地址和虚拟地址这里一共要占用8个字节再考虑权限相关的信息这里粗略地认为每个表项总共占用了10个字节。
那么总共就需要232 * 10 字节也就是40GB的大小。 在32位平台下最大的内存也就仅有4GB说明这种页表映射的方式是不合理的。
1.2.2 二级页表
在Linux中的处理方式是建立一个二级页表。 实现方法
虚拟地址前10个比特位在页目录中进行查找找到相应的页表。再拿10个比特位在对应页表中进行查询找到物理内存中对应页框的起始地址。将最后12个比特位作为偏移量从页框对应地址处向后偏移找到物理内存中的某一个对应字节数据。对应页框的起始地址20个比特位 虚拟地址的最后12个比特位就能够定位任意一个内存字节地址。 物理地址是以“块”为单位的这个块的大小就是4KB也就是212对应了偏移量最大值。
这就是二级页表的结构页目录就是一个一级页表而表项就是一个二级页表。
计算页表的总大小 首先只用了20个比特位来建立映射关系那么最大也就是220个字节也就是1MB。在页表中左边占了10个比特位而右边占了20个比特位共30个比特位这里假设加起来一共占了32个比特位方便计算也就是4个字节。那么总大小就是220 * 4 byte 4MB。
映射过程是由MMU这个硬件完成的页表是一种软件映射MMU是一种硬件映射。 MMU是Memory Management Unit的缩写中文名是内存管理单元有时称作分页内存管理单元英语paged memorymanagementunit缩写为PMMU。它是一种负责处理中央处理器CPU的内存访问请求的计算机硬件。它的功能包括虚拟地址到物理地址的转换即虚拟内存管理、内存保护、中央处理器高速缓存的控制在较为简单的计算机体系结构中负责总线的仲裁以及存储体切换 1.3 线程的优缺点
优点
创建一个新线程的代价要比创建一个新进程小得多与进程之间的切换相比线程之间的切换需要操作系统做的工作要少很多线程占用的资源要比进程少很多能充分利用多处理器的可并行数量在等待慢速I/O操作结束的同时程序可执行其他的计算任务计算密集型应用为了能在多处理器系统上运行将计算分解到多个线程中实现I/O密集型应用为了提高性能将I/O操作重叠。线程可以同时等待不同的I/O操作。
缺点
性能损失 一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多那么可能会有较大的性能损失这里的性能损失指的是增加了额外的同步和调度开销而可用的资源不变。 健壮性降低 编写多线程需要更全面更深入的考虑在一个多线程程序里因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的换句话说线程之间是缺乏保护的。 缺乏访问控制 进程是访问控制的基本粒度在一个线程中调用某些OS函数会对整个进程造成影响。 编程难度提高 编写与调试一个多线程程序比单线程程序困难得多
1.4 线程异常
单个线程如果出现除零野指针问题导致线程崩溃进程也会随着崩溃线程是进程的执行分支线程出异常就类似进程出异常进而触发信号机制终止进程进程终止该 进程内的所有线程也就随即退出
1.5 线程用途
合理的使用多线程能提高CPU密集型程序的执行效率合理的使用多线程能提高IO密集型程序的用户体验如生活中我们一边写代码一边下载开发工具就是多线程运行的一种表现
1.6 Linux进程VS线程
进程是资源分配的基本单位。线程是调度的基本单位。线程共享进程数据但也拥有自己的一部分数据 线程ID。一组寄存器。栈。errno。信号屏蔽字。调度优先级
进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
文件描述符表。每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)。当前工作目录。用户id和组id。
进程和线程的关系如下图
2. Linux线程控制
在Linux系统的的视角下Linux下没有真正意义的线程而是用进程模拟的线程LWP轻量级进程所以Linux不会提供直接创建线程的系统调用最多提供创建轻量级进程的接口。
但是对于用户来说用户需要的是线程接口。 所以Linux提供了用户线程库对下将Linux接口封装对上给用户提供进行线程控制的接口也就是pthread库原生线程库
PROSIX线程库
与线程有关的函数构成了一个完整的系列绝大多数函数的名字都是以“pthread_”打头的。要使用这些函数库要通过引入头文件pthread.h链接这些线程函数库时要使用编译器命令的“-lpthread”选项
2.1 创建线程
函数原型
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);函数功能创建一个新的线程 参数
thread线程IDattr设置线程的属性attr为NULL表示使用默认属性start_routine是个函数地址线程启动后要执行的函数arg传给线程启动函数的参数
返回值成功返回0失败返回错误码
错误检查
传统的一些函数是成功返回0失败返回-1并且对全局变量errno赋值以指示错误。pthreads函数出错时不会设置全局变量errno而且大部分其他POSIX函数也会这样做而是将错误代码通过返回值返回。pthreads同样也提供了线程内的errno变量以支持其它使用errno的代码。对于pthreads函数的错误建议通过返回值来判定因为读取返回值要比读取线程内的errno变量的开销更小。
1代码示例创建一个新线程
#include iostream
#include pthread.h
#include unistd.h
using namespace std;void* threadRun(void* args)
{while(1){cout new thread running : getpid() endl;sleep(1);}
}int main()
{pthread_t pid;pthread_create(pid, nullptr, threadRun, nullptr);while(1){cout main thread running : getpid() endl;sleep(1);}return 0;
}2运行结果 由打印结果可以看到主线程和新线程都打印了相应的字符串。 使用
ps -aL | head -1 ps -aL | grep test1命令查看执行的线程test是C可执行文件 2.2 线程ID及地址空间布局
pthread_ create函数会产生一个线程ID存放在第一个参数指向的地址中。该线程ID和前面说的线程ID不是一回事。前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程是操作系统调度器的最小单位所以需要一个数值来唯一表示该线程。pthread_ create函数第一个参数指向一个虚拟内存单元该内存单元的地址即为新创建线程的线程ID属于本地线程库的范畴。线程库的后续操作就是根据该线程ID来操作线程的。本地线程库提供了pthread_ self函数可以获得线程自身的ID
pthread_t pthread_self(void);1代码示例
#include iostream
#include pthread.h
#include unistd.h
using namespace std;void* threadRun(void* args)
{while(1){cout new thread running : getpid() 线程id : pthread_self() endl;sleep(1);}
}int main()
{pthread_t pid;pthread_create(pid, nullptr, threadRun, nullptr);while(1){cout main thread running : getpid() endl;sleep(1);}return 0;
}2运行结果 pthread_t 到底是什么类型呢取决于实现。对于Linux目前实现的本地线程库实现而言pthread_t类型的线程ID本质就是一个进程地址空间上的一个地址。
下图中的mmap区域是共享区 2.3 线程终止
只终止某个线程而不是终止整个进程可以有三种方法
线程函数处进行return。线程可以自己调用pthread_exit函数终止自己。一个线程可以调用pthread_cancel函数终止同一进程中的另一个线程。
2.3.1 线程函数处进行return
注意在线程中使用return代表该线程退出而在main函数主线程中使用return代表整个进程退出。
#include iostream
#include pthread.h
#include unistd.h
using namespace std;void* threadRun(void* args)
{while(1){cout new thread running : getpid() 线程id : pthread_self() endl;sleep(1);}return nullptr;
}int main()
{pthread_t pid;pthread_create(pid, nullptr, threadRun, nullptr);return 0;
}该代码并不会打印新线程中的字符串因为主线程退出整个进程都终止了。
2.3.2 使用pthread_exit函数
void pthread_exit(void *retval);函数功能线程终止 参数
retval线程退出时的退出码信息
返回值无返回值跟进程一样线程结束的时候无法返回到它的调用者自身
1pthread_exit 或者 return 返回的指针所指向的内存单元必须是全局的或者是 malloc 分配的不能在线程函数的栈上分配因为当其他线程得到这个返回指针时线程函数已经退出了。
#include iostream
#include pthread.h
#include unistd.h
using namespace std;void* threadRun(void* args)
{int cnt 5;while(cnt--){cout new thread running : getpid() 线程id : pthread_self() endl;sleep(1);}pthread_exit((void*)1);
}int main()
{pthread_t pid;pthread_create(pid, nullptr, threadRun, nullptr);void *ret nullptr;// pthread_join表示线程等待主线程执行完后还需等待其他线程pthread_join(pid, ret); // 这里会把线程退出码信息通过该函数给retcout new thread exit code is : (int64_t)ret endl;// 这里使用int64_t强制转换是因为平台下Linux的指针是8字节的。return 0;
}2运行结果 在线程等待的情况下新线程在5秒后结束了并返回了线程退出码。
2.3.3 pthread_cancel函数
int pthread_cancel(pthread_t thread);功能取消一个执行中的线程。 参数
thread线程ID。
返回值成功返回0失败返回错误码。
1线程是可以取消自己的甚至新线程也可以取消主线程取消成功的线程的退出码一般是 -1。
#include iostream
#include pthread.h
#include unistd.h
using namespace std;void* threadRun(void* args)
{int cnt 5;while(cnt--){cout new thread running : getpid() 线程id : pthread_self() endl;sleep(1);}pthread_exit((void*)1);
}int main()
{pthread_t pid;pthread_create(pid, nullptr, threadRun, nullptr);sleep(3);//取消新线程pthread_cancel(pid);void *ret nullptr;// pthread_join表示线程等待主线程执行完后还需等待其他线程pthread_join(pid, ret); // 这里会把线程退出码信息通过该函数给retcout new thread exit code is : (int64_t)ret endl;// 这里使用int64_t强制转换是因为平台下Linux的指针是8字节的。return 0;
}2运行结果 2.4 线程等待
1为什么需要线程等待
已经退出的线程其空间没有被释放仍然在进程的地址空间内。创建新的线程不会复用刚才退出线程的地址空间。
2线程等待函数
int pthread_join(pthread_t thread, void **retval);功能等待线程结束。 参数
thread被等待线程的ID。retval线程退出时的退出码信息。
返回值线程等待成功返回0失败返回错误码。
调用该函数的线程将挂起等待直到id为thread的线程终止。thread线程以不同的方法终止通过pthread_join得到的终止状态是不同的总结如下:
如果thread线程通过return返回retval 所指向的单元里存放的是thread线程函数的返回值。如果thread线程被别的线程调用 pthread_cancel 异常终止掉retval所指向的单元里存放的是常数PTHREAD_CANCELED就是-1。如果thread线程是自己调用 pthread_exit 终止的retval 所指向的单元存放的是传给pthread_exit 的参数。如果对thread线程的终止状态不感兴趣可以传 NULL给retval参数。 3代码演示让新线程创建5s后退出随后再过几秒后被thread_join等待当主进程开始打印消息时说明新线程join等待完成
#include iostream
#include pthread.h
#include unistd.h
using namespace std;void* thread_rum(void* args)
{const char* name static_castconst char*(args);int cnt 5;while (true){printf(%s 正在运行, thread id: 0x%x\n, name, pthread_self());sleep(1);if (!(cnt--)){break;}}cout 线程退出啦.... endl;return nullptr;
}int main()
{pthread_t pit;int n pthread_create(pit, nullptr, thread_rum, (void*)new thread);sleep(3);pthread_join(pit, nullptr);cout main thread join success endl;sleep(3);while(1){cout main thread pthread_self() endl;sleep(1);}return 0;
}运行结果 可以使用如下脚本来监控线程运行状况
[xiaomakerVM-28-13-centos test_12_07]$ while :; do ps -aL | head -1 ps -aL | grep mythread; sleep 1; done通过如上代码可以发现当创建线程后线程1正在运行5s后新线程退出了我们的监控脚本观察到线程由两个变成了一个但是正常情况下预期应该是两个线程随后线程等待成功这里还是只能看到一个线程。不是说好退出后应该看到的是两个线程吗事实上一个线程退出后我们并没有看到预期结果。原因是ps命令在查的时候退出的线程是不给你显示的所以你只能看到一个线程。但是现在不能证明当前的新线程在退出没有被join的时候就没有内存泄漏。
所以线程退出的时候一般必须要进行join如果不进行join就会造成类似于进程那样的内存泄漏问题。
4线程异常的问题
野指针代码演示
#include iostream
#include pthread.h
#include unistd.h
using namespace std;void* thread_rum(void* args)
{const char* name static_castconst char*(args);int cnt 5;while (true){printf(%s 正在运行, thread id: 0x%x\n, name, pthread_self());sleep(1);if (!(cnt--)){int* p nullptr;*p 100;break;}}cout 线程退出啦.... endl;return (void*)10;
}int main()
{pthread_t pit;int n pthread_create(pit, nullptr, thread_rum, (void*)new thread);void *ret nullptr;pthread_join(pit, ret);cout main thread join success, *ret: (long long)ret endl;while(1){cout main thread pthread_self() endl;sleep(1);}return 0;
}运行结果 此时会发现待线程出现野指针问题时左边会显示段错误而右边监控脚本中的线程直接就没了。此时就说明当线程异常了那么整个进程整体异常退出线程异常 进程异常。所以线程会影响其它线程的运行 —— 线程的健壮性鲁棒性较低。
5如何理解第二个参数retval 参数retval是线程退出时的退出码这是一个二级指针一个输出型参数。刚刚我们的代码中以及涉及到了线程退出的方式从线程函数return。退出的类型是void*这里我们把先前退出返回的nullptr改为(void*)10。 此线程退出后我们是通过pthread_join函数获得此线程的退出结果退出结果是void*类型可retval是void**类型我们需要传入一个二级指针。下面演示获得此线程的退出结果的过程并打印此退出码代码如下
#include iostream
#include pthread.h
#include unistd.h
using namespace std;void* thread_rum(void* args)
{const char* name static_castconst char*(args);int cnt 5;while (true){printf(%s 正在运行, thread id: 0x%x\n, name, pthread_self());sleep(1);if (!(cnt--)){break;}}cout 线程退出啦.... endl;return (void*)10;
}int main()
{pthread_t pit;int n pthread_create(pit, nullptr, thread_rum, (void*)new thread);void *ret nullptr;pthread_join(pit, ret);cout main thread join success, *ret: (long long)ret endl;while(1){cout main thread pthread_self() endl;sleep(1);}return 0;
}运行结果 这里我们就得到了新线程退出时的退出码 10。综上ptherad_join的第二个参数retval的作用就是一个输出型参数获取新线程退出时的退出码。我们先前讲过进程退出时分为三种情况
代码跑完结果正确代码跑完结果不正确异常
在线程退出时代码跑完结果不正确和结果正确都可以得到退出码但是线程异常时并不会出现退出码。那么为什么异常时主线程没有获取新线程退出时的信号呢
这是因为线程出异常就不再是线程的问题而是进程的问题应该让父进程获取退出码知道它什么原因退出的。因此线程终止时只需考虑正常终止。
其实线程终止有3种方法见上文。
2.5 线程分离
默认情况下新创建的线程是joinable的线程退出后需要对其进行pthread_join操作否则无法释放资源从而造成系统泄漏。如果不关心线程的返回值join是一种负担这个时候我们可以告诉系统当线程退出时自动释放线程资源。
int pthread_detach(pthread_t thread);可以线程组内其他线程对目标线程进行分离也可以是线程自己分离。 当一个线程分离后是不能够被join的。
1错误使用示例
#include iostream
#include pthread.h
#include unistd.h
#include cstring
using namespace std;void* thread_run(void* args)
{char* name static_castchar*(args);int cnt 5;while(cnt){cout name : cnt -- endl;}return nullptr;
}int main()
{pthread_t pid;pthread_create(pid, nullptr, thread_run, (void*)thread_1);pthread_detach(pid);//error,线程分离后不能joinint n pthread_join(pid, nullptr);if(n ! 0){cerr error : n : strerror(n) endl; }return 0;
}2这里可能会出现两种情况一种是线程1先执行完再提示出main函数里的错误打印。一种是直接错误打印。 原因是线程之间谁先执行是不确定的。假设线程1先执行因为代码没有sleep函数执行完也就一瞬间的事情。线程1执行完后接下来是主线程执行主线程这时候才去判断有没有join所以会导致最后才打印错误信息。
假设主线程先执行发现线程分离了以后还join了所以程序之间报错结束程序。
3正确使用示例
#include iostream
#include pthread.h
#include unistd.h
#include cstring
using namespace std;void* thread_run(void* args)
{char* name static_castchar*(args);int cnt 5;while(cnt){cout name : cnt -- endl;}return nullptr;
}int main()
{pthread_t pid;pthread_create(pid, nullptr, thread_run, (void*)thread_1);pthread_detach(pid);usleep(1000);return 0;
}这里反复运行发现会有两种情况一种是主线程先执行主线程执行结束直接return,进程直接结束。另外一种是线程1先执行再到主线程。
这里主要是想说明线程分离并不影响“主线程结束导致其他线程被迫退出”的情况。线程分离后主线程执行结束各线程会主动释放空间避免僵尸进程的情况。
4让主线程的休眠时间大于新线程即可让新线程先打印
#include iostream
#include pthread.h
#include unistd.h
#include cstring
using namespace std;void* thread_run(void* args)
{char* name static_castchar*(args);int cnt 5;while(cnt){cout name : cnt -- endl;}return nullptr;
}int main()
{pthread_t pid;pthread_create(pid, nullptr, thread_run, (void*)thread_1);pthread_detach(pid);sleep(5);return 0;
}5运行结果 3. Linux线程互斥
3.1 进程线程间的互斥相关背景概念
临界资源多线程执行流共享的资源就叫做临界资源。临界区每个线程内部访问临界资源的代码就叫做临界区。互斥任何时刻互斥保证有且只有一个执行流进入临界区访问临界资源通常对临界资源起保护作用。原子性后面讨论如何实现不会被任何调度机制打断的操作该操作只有两态要么完成要么未完成。
3.2 互斥量mutex
大部分情况线程使用的数据都是局部变量变量的地址空间在线程栈空间内这种情况变量归属单个线程其他线程无法获得这种变量。但有时候很多变量都需要在线程间共享这样的变量称为共享变量可以通过数据的共享完成线程之间的交互。多个线程并发的操作共享变量会带来一些问题。
3.3 代码演示
1写个代码体验线程不安全
#include iostream
#include cstdio
#include cstring
#include unistd.h
#include pthread.h
#include vectorusing namespace std;int ticket 3000;class threadData
{
public:threadData(int number){threadname thread- to_string(number);}public:string threadname;
};void* getTicket(void* args)
{threadData *td static_castthreadData *(args);const char *name td-threadname.c_str();while (1){if(ticket 0){usleep(1000);printf(who%s, get a ticket: %d\n, name, ticket);ticket--;}else{break;}usleep(13);}printf(%s ... quit\n, name);return nullptr;
}int main()
{vectorpthread_t tids;vectorthreadData* thread_datas;for(int i 1; i 4; i){pthread_t tid;threadData* td new threadData(i);thread_datas.push_back(td);pthread_create(tid, nullptr, getTicket, thread_datas[i - 1]);tids.push_back(tid);}for (auto thread : tids){pthread_join(thread, nullptr);}for (auto td : thread_datas){delete td;}return 0;
}全局变量tickets表示票的数量。主线程中创建了4个新线程代表4个用户去抢票。每个新线程在抢到票后将票数减一并且打印出票的数量。当票被抢完以后线程退出不再进行抢票。
2运行结果 运行以后发现出现了负数票这不合理票抢完就应该停止了包括我们的代码逻辑都是这样写的但是此时就出现了这种情况。 其实上面现象的原因是发生了线程不安全问题。
3如何产生线程不安全现象 上面现象是故意弄出来的涉及到了线程调度利用了线程调度的特性造出了一个这样的现象。要想出现上面的现象就需要
尽可能让多个线程交叉执行。多个线程交叉执行的本质就是让调度器尽可能的频繁发生线程调度与切换。
虽然看起来是多个线程在同时运行但这是由于CPU运行速度太快导致的实际上CPU是一个线程一个线程执行的。现在就是要让CPU频繁调度不停的切换线程一个线程还没有执行完就再执行下一个每个线程都执行一点这样交叉执行。
当一个线程进行延时的时候CPU并不会等它而是会将它放在等待队列里然后去执行另一个线程等延时线程醒来以后才会接着执行。
线程在时间片到来更高优先级线程到来线程等待的时候会发生线程切换。线程是在从内核态转换成用户态的时候检测是否达到线程切换的条件的。
线程检测是否切换是以内核态的身份去检测的执行的是3~4G内核空间中的代码本质上是操作系统在检测。
4产生线程不安全现象的原因
①假设tickets已经只剩一张了即全局变量tickets 1。 主线程创建好4个新线程以后4个新线程便开始执行了在执行到延时的时候新线程就会被放在等待队列里。
②CPU及内核if判断的本质逻辑
从内存中读取数据到CPU寄存器。进行判断。 在线程user1执行到if判断时CPU从内存中将tickets变量中的数据1拿到了CPU的寄存器ebx中。CPU进行判断后发现符合大于0的条件。
③当线程user1符合条件继续向下执行延时代码时CPU将线程user1切走了换上了user2。
在线程user1被切走的时候它的上下文数据也会被切走。所以ebx寄存器中的1也会跟着user1的PCB被切走。
user2被调度时仍然重复user1的过程执行延时被切走再换上user3以此类推直到user4被切走。
四个线程都拿到了tickets1所以符合条件都能向下执行。当user4被挂起后就会轮到user1进行调度了。
④user1唤醒以后接着被切走的位置继续执行 执行tickets - - 的本质
从内存中读取数据到CPU的寄存器更改数据写回数据到内存中
虽然C/C代码只有一条语句但是汇编后至少有3条语句。
user1执行tickets- - 以后抢票成功了并且将抢票后的tickets0写回到了内存中。
⑤此时user2醒来了同样接着它被切走的位置继续执行此时user2回来后认为tickets1所以就向下执行了 当执行tickets减减时仍然需要三步
从内存中读取tickets0到CPU寄存器ebx中。修改值从0变成-1。将-1写回内存中。
当user2执行完后user3和user4醒来同样继续向下执行重复上面的过程仍然对tickets减一所以导致结果不合理。
3.4 线程不安全的原因
1只存在两个线程对全局变量tickets仅作减减操作 线程A先被CPU调度进行减减操作。
从内存中将tickets1000取到寄存器ebx中。进行减减操作tickets变成了999。在执行第三步写回数据之前线程A被切走了。
2线程A切走的同时它的上下文也就是tickets999也被切走了。 线程B此时被调度线程A在等待队列。
线程B先从内存中读取tickets 1000到寄存器ebx中。进行减减操作。将减减后的值写回到内存中。线程B将减减操作完整的执行了很多遍直到tickets200时才被切下去。
3线程B被切走以后线程A又接着被调度。 线程A接着被切走的位置开始执行也就是执行减减的第三步操作—写回。
线程A被调度后先恢复上下文将被切走时的tickets999恢复到了ebx寄存器中。然后执行第三步将tickets999写回到了内存中。
线程B辛辛苦苦将tickets从1000减到了200线程A重新被调度后直接将tickets又从200写回到了999。
上面这种现象被叫做数据不一致问题。 导致数据不一致问题的原因共享资源没有被保护多线程对该资源进行了交叉访问。
而解决数据不一致问题的办法就是对共享资源加锁。
4. 加锁
4.1 互斥量的接口
4.1.1 初始化互斥量
初始化互斥量有两种方法
方法1静态分配全局变量
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER方法2动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);参数
mutex创建的互斥锁指针。attr锁的属性一般情况下设为nullptr。
返回值初始化成功返回0失败返回错误码。 函数功能将创建的锁初始化。
4.1.2 销毁互斥量 销毁互斥量需要注意 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁。不要销毁一个已经加锁的互斥量。已经销毁的互斥量要确保后面不会有线程再尝试加锁。 销毁互斥量接口
int pthread_mutex_destroy(pthread_mutex_t *mutex)参数
mutex创建的互斥锁指针。
返回值销毁成功返回0失败返回错误码。 函数功能当锁使用完后必须进行销毁。
4.1.3 互斥量加锁和解锁
加锁接口
int pthread_mutex_lock(pthread_mutex_t *mutex);参数
mutex创建的互斥锁指针。
返回值加锁成功返回0失败返回错误码。 函数功能给临界区加锁让多线程串行访问临界资源。
调用 pthread_mutex_lock 时可能会遇到以下情况
互斥量处于未锁状态该函数会将互斥量锁定同时返回成功发起函数调用时其他线程已经锁定互斥量或者存在其他线程同时申请互斥量但没有竞争到互斥量那么pthread_ lock调用会陷入阻塞(执行流被挂起)等待互斥量解锁。
解锁接口
int pthread_mutex_unlock(pthread_mutex_t *mutex);参数
mutex创建的互斥锁指针。
返回值解锁成功返回0失败返回错误码。 函数功能解锁让多线程恢复并发执行。
锁其实起一个区间划分的作用在加锁和解锁之间的代码就是临界区多个执行流只能串行执行临界区代码从而保护公共资源使之成为临界资源。
pthread_mutex_t lock PTHREAD_MUTEX_INITIALIZER;pthread_mutex_lock(lock);
//临界区
//...
pthread_mutex_unlock(lock);加锁和解锁两句代码圈定了临界区的范围。
4.2 改进代码
1现在将上面的代码加上锁看看是否还会出现多线程数据不一致问题 在主线程中创建一个互斥锁并且初始化在所有新线程等待成功后将锁释放。
2但是此时的锁是存在于主线程的栈结构中需要让所有新线程看到这把锁。 3在线程数据结构体中再增加一个锁指针此时所有线程就都能看到这把锁了。 在新线程中对临界区加锁和解锁让所有线程串行执行临界区中代码。
解锁不能放在else的代码块后面防止break出循环但是没有解锁。在else的break前也要有解锁防止if条件不满足直接跳出循环没有解锁。
最后运行结果 此时抢票的结果是正常了最终抢到1结束符合我们的预期。
4抢票的速度比以前慢了好多。
加锁和解锁的过程是多个线程串行执行的并且临界区的代码也是串行执行的所以速度就变慢了。
5我们可以发现只有3号进程在抢票其他线程没有抢。
锁只规定了互斥访问并没有规定必须让谁先执行。锁是让多个执行流进行竞争的结果。
只有3号进程在执行说明3号进程的竞争能力强别的线程抢不过它。因为现在的抢票逻辑是抢到票以后立马释放然后就又立马申请锁了所以之前持有锁的线程更加容易再次申请到锁。
6实际上抢票成功后不可能立刻再去抢还需要做一些工作比如给用户打印订单等等。 在抢票成功后延时13微秒代表线程做的后续工作。
运行结果 此时就成了多个线程在一起抢票。
当一个线程从临界区中出来并且释放锁后执行后续任务时其他线程才有能力去竞争锁。 加锁后的代码结构上如上图所示。
加锁时一定要保证临界区的粒度非常小。将那些不是必须放在临界区中的代码放在临界区外。加锁是程序员行为要加锁就所有线程都加锁否则就起不到保护共享资源的效果。
4.3 锁的本质
我们应该如何看待锁
在上面代码中一个锁必须让所有线程都看到所以锁本身就是一个共享资源。
既然是共享资源锁也必须是安全的那么是谁来保证锁的安全性呢
锁是通过加锁和解锁是原子的来保证自身的安全的。
1一个线程如果申请成功锁那么它就会继续向下执行如果暂时申请不成功呢如下代码 运行结果 查看后台线程和进程 此时代码就被阻塞住了线程和进程都是存在的。
一个锁只能被申请一次只有锁被释放后才能再次申请。
当一个线程申请锁暂时失败以后就会阻塞不动。 当多个线程在执行上面这部分代码。
当一个线程申请锁成功进入临界区访问临界资源其他线程要想进入临界区只能阻塞等待等锁释放。当一个线程申请锁成功进入临界区访问临界资源同样是能被切走的而且该线程是抱着锁走的其他线程仍然无法申请锁成功。
操作系统内部并不存在锁的概念所以调度器在调度轻量级进程的时候并不会考虑是否有锁。
所以站在其他线程的角度锁只有两种状态
申请锁前申请锁后
站在其他线程的角度看到当前持有锁的过程就是原子的。
2加锁解锁的原理 经过上面的例子我们认识到一个事实c/c中加加和减减的操作并不是原子的所以会导致多线程数据不一致的问题。
而为了能让加锁过程是原子的在大多数体系结构了都提供了swap或者xchange汇编指令通过一条汇编指令来保证加锁的原子性。
加锁解锁的代码
lock:movb %al, $0xchange %al, mutexif(al寄存器的内容 0){return 0;}else{挂起等待; }goto lock;unlock:movb mutex, $1唤醒等待mutex的线程;return 0;加锁过程中xchange是原子的可以保证锁的安全。
3如上代码解析图
①假设现在有两个线程ThreadA和ThreadB 线程A在执行线程B在等待线程A加锁时的第一步就是将0写入到al寄存器中。
在执行完第一条汇编后线程A是可以被切走的而且在切走的时候会将它的上下文也就是al中的0带走。这一步的本质就是将0写入到线程A的上下文中。
② 线程A在执行下步的时候直接将内存中mutex中的数据交换到了al寄存器中。 在执行完交换的时候线程A同样可以被切走而且是带着上下文走的也就是会将al中的mutex带走。
交换的本质就是将锁交换到线程A的上下文中。
③现在线程A被切走了而且带走了它的上下文mutex 线程B在执行的时候先第一步给寄存器al写0然后执行第二步交换锁和al中的值。
此时al中的值虽然交换了但是仍然是0根据上面的伪代码if条件不成立所以将线程B挂起等待了。
④此时操作系统就会又将线程A调回来继续执行 线程A做的第一件事情就是恢复上下文将锁恢复到al寄存器中。
线程A执行下一步时if条件成了所以该线程就申请锁成功了。
经过上面过程的描述我们可以发现锁只能被一个线程持有而且由于xchange汇编只有一条指令即使申请锁的过程被切走也不怕。
一旦一个线程通过xchage拿到了锁即使它被切走也是抱着锁走的其他线程是无法拿到锁的只有等它将锁释放。
只有持有锁的线程才能执行下去锁相当于一张入场卷。
这样来看释放锁的过程其实对原子性的要求并没有那么高因为释放锁的线程必定是持有锁的线程不持有锁的线程都不会执行到这里都在阻塞等待。 线程A在解锁时仅是将内存中存放锁的变量写为1此时其他线程在xchange以后就可以通过if条件判断申请锁了。
虽然解锁对原子性的要求不是很必要但是在设计上还是要设计成原子的可以看到解锁也是只通过一条汇编就搞定了。
注意上图中锁中的变量1仅仅是表示锁存在并不是真正意义上的数字1。
4.4 对锁进行封装
为了更好的使用C像封装线程那样将加锁也封装成一个小组件方便我们后面使用。
mutex.hpp
#include iostream
#include pthread.husing namespace std;//pthread_mutex_t lock PTHREAD_MUTEX_INITIALIZER;templateclass T
class Mutex
{
public:Mutex(pthread_mutex_t* lock nullptr):_lock(lock){}void lock(){pthread_mutex_lock(_lock);}void unlock(){pthread_mutex_unlock(_lock);}private:pthread_mutex_t* _lock;
};templateclass T
class LockGuard
{
public:LockGuard(pthread_mutex_t* mutex):_mutex(mutex){_mutex.lock();}~LockGuard(){_mutex.unlock();}private:MutexT _mutex;
};只需要创建一个LockGurd对象就可以进行加锁需要传入锁的地址在LockGuard对象生命周期结束的时候会自动释放锁。 创建一个全局的锁就不用使用pthread_mutex_init取初始化也不用使用pthread_mutex_destroy来销毁锁了直接使用就行。 在临界区加锁执行完临界区代码后解锁。
将临界区放在一个代码块中此时LockGuard的生命周期就是这个代码块。创建LockGuard对象时在构造函数中自动加锁出作用域时析构函数自动解锁。
运行结果 使用封装的加锁小组将抢票的结果和我们之前直接用系统调用加锁是一样的。
上面这种加锁的风格被称为RAII加锁。
test.cpp代码
#include mutex.hppint ticket 1000;class threadData
{
public:threadData(int number, pthread_mutex_t* lock):_lock(lock){threadname thread- to_string(number);}public:string threadname;pthread_mutex_t* _lock;
};pthread_mutex_t lock PTHREAD_MUTEX_INITIALIZER;void* getTicket(void* args)
{threadData *td static_castthreadData *(args);const char *name td-threadname.c_str();while (1){LockGuardint getlock(lock);//pthread_mutex_lock(lock);if(ticket 0){usleep(1000);printf(who%s, get a ticket: %d\n, name, ticket);ticket--;//pthread_mutex_unlock(lock);}else{//pthread_mutex_unlock(lock);break;}usleep(13);}printf(%s ... quit\n, name);return nullptr;
}int main()
{pthread_mutex_t lock; //创建锁pthread_mutex_init(lock, nullptr); //初始化锁 vectorpthread_t tids;vectorthreadData* thread_datas;for(int i 1; i 4; i){pthread_t tid;threadData* td new threadData(i, lock); //将锁传递给线程thread_datas.push_back(td);pthread_create(tid, nullptr, getTicket, thread_datas[i - 1]);tids.push_back(tid);}for (auto thread : tids){pthread_join(thread, nullptr);}for (auto td : thread_datas){delete td;}//pthread_mutex_destroy(lock); //销毁锁return 0;
}5. 可重入VS线程安全
5.1 重入概念
1重入同一个函数被不同的执行流调用当前一个流程还没有执行完就有其他的执行流再次进入我们称之为重入。
之前在信号部分就提到过重入进程在执行一个函数收到某个信号在处理信号时又调用了这个函数。今天在多线程这里理解重入更加容易我们上面写的多线程代码都是重入的。
2可重入和不可重入的区别一个函数在重入的情况下运行结果不会出现任何不同或者任何问题则该函数被称为可重入函数否则是不可重入函数。
3常见可重入情况
不使用全局变量或静态变量。不使用用malloc或者new开辟出的空间。不返回静态或全局数据所有数据都有函数的调用者提供。
4常见不可重入情况
调用了malloc/free函数因为malloc函数是用全局链表来管理堆的函数。可重入函数体内使用了静态的数据结构。调用了标准I/O库函数标准I/O库的很多实现都以不可重入的方式使用全局数据结构。
总的来说一个函数中如果使用了全局数据或者静态数据以及堆区上的数据就是不可重入的反之就是可重入的。
5.2 线程安全
多个线程并发同一段代码时不会出现不同的结果(数据不一致)。常见对全局变量或者静态变量进行操作并且没有锁保护的情况下会出现该问题。
互斥锁就是让不安全的线程变安全也就是前面我们所学习的内容。
1常见线程安全情况
每个线程对全局变量或者静态变量只有读取的权限而没有写入的权限一般来说这些线程是安全的。类或者接口对于线程来说都是原子操作。多个线程之间的切换不会导致该接口的执行结果存在二义性。
多线程共同执行的代码段中如果有全局变量或者静态变量并且没有保护那么就是线程不安全的。
2常见线程不安全情况
不保护共享变量的函数。函数状态随着被调用状态发生变化的函数。返回指向静态变量指针的函数。
3可重入与线程安全的联系
多线程是通过调用函数来实现的所以线程安全和重入就存在一些联系
函数是可重入的那就是线程安全的因为没有全局或者静态变量不会产生数据不一致问题。函数是不可重入的那就不能由多个线程使用有可能引发线程安全问题。出发对不可重入函数的全局变量进行保护。如果一个函数中有全局变量并且没有保护那么这个函数既不是线程安全也不是可重入的。
4可重入与线程安全的区别
可重入和线程安全是不同的两个东西但是又存在一定的交集。
可重入说的是函数。线程安全说的是线程。
可重入函数是线程安全函数的一种因为不存在全局或者静态变量。
线程安全不一定是可重入的而可重入函数则一定是线程安全的。因为线程安全的情况可能是对全局变量进行了保护(加了锁)。
由于线程可以加锁所以说线程安全的情况比可重入要多。
6. 死锁
6.1 死锁的概念
我们前面例子中写的都是只有一把锁的情况在实际使用中有可能会存在多把锁此时就可能造成死锁。
死锁一组执行流中的各个执行流均占有不会释放的锁资源但因互相申请被其他进程所站用不会释放的锁资源而处于的一种永久等待状态。
通俗来说就是一个线程自己持有锁并且不会释放但是还要申请其他线程的锁此时就容易造成死锁。
一把锁也是会死锁的连续申请俩次就是死锁。
在上面演示一个线程暂时申请锁失败而阻塞时就是死锁。
死锁的逻辑链条 可以看到往往解决一个问题就会引出新的问题然后再区解决新的问题。
6.2 死锁的必要条件
1死锁的四个必要条件
互斥条件一个资源每次只能被一个执行流使用
这一点不用说只要用到锁就会互斥。
请求与保持条件一个执行流因请求资源而阻塞时对已获得的资源保持不放
请求就是指一个执行流申请其他锁保持是指不释放自己已经持有的锁。
不剥夺条件一个执行流已获得的资源在末使用完之前不能强行剥夺
一个执行流已经持有锁在不主动释放前不能强行剥夺。
循环等待条件若干执行流之间形成一种头尾相接的循环等待资源的关系请求 线程ABC都持有一把锁并且不释放。
线程A 申请 线程B持有的锁B线程B 申请 线程C持有的锁C线程C 申请 线程A持有的锁A
此时就构成了环路阻塞等待。
只有符合上面四个条件就会造成死锁。而要破坏死锁只要破坏其中一个条件即可。
2避免死锁
四个必要条件中的第一个无法破坏因为我们使用的就是锁锁就具有互斥的性质。只能破坏其他三个条件。
避免锁未释放的场景
这是为了破坏请求与保持条件。当一个执行流在申请另一个锁的时候要先释放已经持有的锁再申请。
加锁顺序一致
这是为了避免形参环路等待只要不构成环路即可。
资源一次性分配
临界资源尽量一次性分配好不要分布在太多的地方加锁这样的话导致死锁的概率就会增加。
3避免死锁的算法
死锁检测算法银行家算法
上面两种算法了解即可。
采用算法来避免死锁时就会有一个执行流专门用来监测其他执行流的状态一旦发现某个执行流长时间没有执行就释放它所持有的锁。 从解锁的伪代码只能可以看出解锁是可以由其他线程来完成的只需要将锁重新赋值到锁的共享资源变量中即可。
总结互斥锁在实际中能不用就不用实在没有办法的时候也要尽量少用。
7. Linux线程同步
7.1 同步概念与竞态条件
首先抛出一个问题线程互斥它是对的但是它在任何场景合理都吗
答不一定合理。
举个例子
我们去食堂打饭食堂打饭的规则是竞争式的抢饭不用排队。力气大的人会优先去抢到饭男生 --优先级高的线程力气小的就会一直抢不到饭女生 – 优先级小的线程。这种规则没有错确实食堂阿姨一次只能给一个人打饭但是不合理会造成弱小的人的饥饿问题迟迟没有吃到饭在多线程竞争锁来看优先级高的线程会一直优先申请到锁资源而优先级低的线程会长时间得不到对应的资源会造成多执行流下的饥饿问题互斥下的饥饿问题多线程下的某个执行流长时间得不到某种资源。
同步概念
同步在保证数据安全的前提下(互斥)让线程能够按照某种特定的顺序访问临界资源从而有效避免饥饿问题叫做同步。竞态条件因为时序(CPU调度)问题而导致程序异常我们称之为竞态条件。在线程场景下这种问题也不难理解。同步与互斥是互帮互助的互斥是解决线程安全问题而同步是解决合理性问题。
7.2 条件变量
我们已经直到同步是什么了那么如何实现同步与互斥呢
答条件变量。
条件变量概念
当一个线程互斥地访问临界资源时需要另一个线程对临界资源的状态做改变就需要条件变量了。例如一个线程访问队列时发现队列为空它只能等待直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。当条件由程序员设置满足时设置的条件变量就会阻塞等待执行该函数的线程并且释放锁随后下一个线程就会申请到锁继续判断。
下面代码后面详细讲解
pthread_mutex_lock()
if (YES/NO)
{pthread_cond_wait()
}
// ....做其他事情
pthread_cond_signal() // 或者唤醒其他线程, 也可以在主线程判断唤醒
pthread_mutex_unlock(); // 解锁7.3 条件变量相关接口
7.3.1 初始化和销毁条件变量
1初始化条件变量有二种方法
第一种方法静态分配
#include pthread.h
pthread_cond_t cond PTHREAD_COND_INITIALIZER;代码解析
pthread_cond_t是条件变量它是一个联合体里面有一个结构体描述条件变量的属性。PTHREAD_COND_INITIALIZER它是一个宏用于初始化条件变量。注意静态分配不用释放条件变量。
第二个方法动态分配
#include pthread.h
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrictattr);参数
cond要初始化的条件变量pthread_mutex_t变量的地址。restrictattr设置条件变量的属性一般为NULL/nullptr。
返回值初始化成功返回0失败返回一个错误码errno。 注意动态分配需要释放条件变量。
2销毁条件变量
#include pthread.h
int pthread_cond_destroy(pthread_cond_t *cond)参数
cond要销毁的条件变量pthread_cond_t变量的地址。
返回值初始化成功返回0失败返回一个错误码errno。
销毁条件变量需要注意
使用 PTHREAD_ COND_ INITIALIZER 初始化的条件变量不需要销毁。使用 pthread_cond_init初始化的条件变量需要进行销毁。
7.3.2 阻塞等待条件函数
#include pthread.h
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);参数
cond阻塞要在这个条件变量上等待的线程**pthread_cond_t变量的地址**。mutex互斥锁同步需要与互斥锁绑定使用因为阻塞等待时会释放该线程的锁后面被唤醒时会重新获取锁后面代码感受。
返回值成功完成后返回零值否则返回错误编号(errno)以指示错误。
7.3.3 唤醒阻塞等待的条件变量函数
#include pthread.h
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);参数
cond唤醒在条件变量上等待的线程pthread_cond_t变量的地址。pthread_cond_broadcast唤醒被阻塞等待的全部线程。pthread_cond_signal唤醒被阻塞等待的一个线程按顺序唤醒。
返回值如果成功pthread_cond_broadcast()和pthread-cond_signal()函数返回零否则应返回一个错误编号(errno)以指示错误。
使用同步与互斥实现多线程间轮询运行在主线程唤醒被条件变量阻塞的线程。
7.4 为什么 pthread_cond_wait 需要互斥锁?
条件与条件变量
条件对应的共享资源的状态(比如抢票票数小于0就不能抢了)通过判断的方式来判断对应的资源是否符合要求。条件变量在条件满足或不满足的前提下进行wait(等待) 或 signal(唤醒)的一种方式。
结论
条件等待是线程间同步的一种手段如果只有一个线程条件不满足一直等下去都不会满足。所以必须要有一个线程通过某些操作改变共享变量使原先不满足的条件变得满足并且友好的通知唤醒等待在条件变量上的线程。条件不会无缘无故的突然变得满足了必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。 按照上面的说法我们设计出如下的代码先上锁发现条件不满足解锁然后等待在条件变量上不就行了如下代码
// 错误的设计
pthread_mutex_lock(mutex);
while (condition_is_false)
{pthread_mutex_unlock(mutex);// 解锁之后等待之前条件可能已经满足信号已经发出但是该信号可能被错过 -- 发生线程切换pthread_cond_wait(cond);pthread_mutex_lock(mutex);
} pthread_mutex_unlock(mutex);结论
由于解锁和等待不是原子操作。调用解锁之后 pthread_cond_wait 之前如果已经有其他线程获取到互斥量摒弃条件满足发送了信号那么 pthread_cond_wait 将错过这个信号可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是一个原子操作。int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进入该函数后会去看条件量等于0不等于就把互斥量变成1直到cond_ wait返回把条件量改成1把互斥量恢复成原样 – 原子操作。
条件变量使用规范
等待条件代码
pthread_mutex_lock(mutex);
while (条件为假
pthread_cond_wait(cond, mutex);
//修改条件
pthread_mutex_unlock(mutex);给条件发送信号代码
pthread_mutex_lock(mutex);
//设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(mutex);8. 生产者消费者模型
8.1 为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯而通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给阻塞队列消费者不找生产者要数据而是直接从阻塞队列里取阻塞队列就相当于一个缓冲区平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。
8.2 生产者消费者模型优点
解耦支持并发支持忙闲不均 8.3 基于阻塞队列的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素当队列满时往队列里存放元素的操作也会被阻塞直到有元素被从队列中取出以上的操作都是基于不同的线程来说的线程在对阻塞队列进程操作时会被阻塞。
8.4 C模拟阻塞队列的生产消费模型
1BlockQueue.hpp
#include iostream
#include queue
#include unistd.h
#include pthread.h
#include stdlib.h
using namespace std;#define NUM 8class blockqueue
{
public:blockqueue(int size NUM) //构造函数:_size(size){pthread_mutex_init(_lock, nullptr);pthread_cond_init(_full, nullptr);pthread_cond_init(_empty, nullptr);}void pushdata(const int data) //生产{lockqueue(); //加锁while(IsFull()){//数据已满NotifyConsumer(); //唤醒消费者cout queue full, notify consume data, product stop. endl;producterwait(); //生产者等待}_q.push(data);NotifyConsumer();unlockqueue(); //解锁}void popdata(int data) //消费{lockqueue(); //加锁while(IsEmpty()){//空列表NotifyProducter();cout queue empty, notify product data, consume stop. endl;consumerWait();}data _q.front();_q.pop();NotifyProducter();unlockqueue(); //解锁}~blockqueue() //析构函数{pthread_mutex_destroy(_lock);pthread_cond_destroy(_full);pthread_cond_destroy(_empty);}private:void lockqueue() //加锁{pthread_mutex_lock(_lock);}void unlockqueue() //解锁{pthread_mutex_unlock(_lock);}void producterwait() //生产者线程等待{pthread_cond_wait(_full, _lock);}void consumerWait() //消费者线程等待{pthread_cond_wait(_empty, _lock);}void NotifyProducter() //唤醒阻塞等待的生产者{pthread_cond_signal(_full);}void NotifyConsumer() //唤醒阻塞等待的消费者{pthread_cond_signal(_empty);}bool IsEmpty() //判断队列是否是空{return (_q.size() 0 ? true : false);}bool IsFull() //判断队列是否满{return (_q.size() _size ? true : false);}queueint _q;int _size;pthread_mutex_t _lock;pthread_cond_t _full;pthread_cond_t _empty;
};2test.cpp
#include BlockQueue.hppvoid* consumer(void* args)
{blockqueue* bqc static_castblockqueue*(args);int data;while(1){bqc-popdata(data);cout Consume data done : data endl;//usleep(10000);sleep(1);}
}void* producter(void* args)
{blockqueue* bqp static_castblockqueue*(args);srand((unsigned long)time(NULL));while (1){int data rand() % 1024;bqp-pushdata(data);cout Prodoct data done: data endl;//usleep(10000);sleep(1);}
}int main()
{blockqueue bq;pthread_t con;pthread_t pro;pthread_create(con, nullptr, consumer, (void*)bq);pthread_create(pro, nullptr, producter, (void*)bq);pthread_join(con, NULL); //线程等待消费者pthread_join(pro, NULL); //线程等待生产者return 0;
}3运行结果 8.5 POSIX信号量
8.5.1 什么是POSIX信号量
POSIX和System V都是可移植的操作系统接口标准它们都定义了操作系统为应用程序提供的接口标准。
POSIX信号量和System V信号量作用相同都是用于同步和互斥操作以达到无冲突的访问共享资源目的。System V版本的信号量只适用于实现进程间的通信而POSIX版本的信号量主要用于实现线程之间的通信。
信号量信号灯本质是一个是用来对临界资源进行更细粒度地描述和管理的计数器。
POSIX信号量主要用于实现线程间的同步。
8.5.2 POSIX信号量实现原理
1信号量的结构如下 2结构体成员
count记录还有多少小块的临界资源未被使用。queue当count为0时其它未申请到信号量的线程的task_struct地址会被放到信号量等待队列中阻塞挂起。
3信号量的PV操作
P操作我们把申请信号量得操作称为P操作申请信号量的本质就是申请获得整块临界资源中某小块资源的使用权限当申请成功时临界资源中小块资源的数目应该减一因此P操作的本质就是让count- -。V操作我们将释放信号量称为V操作释放信号量的本质就是归还临界资源中某块资源的使用权限当释放成功时临界资源中资源的数目就应该加一因此V操作的本质就是让count。
申请不到信号量的线程被阻塞挂起
当count为0时表示不允许其它线程再访问临界资源这时其它申请信号量的线程会被阻塞到该信号量的等待队列中直到其它线程释放信号量。
8.5.3 POSIX信号量接口函数
1初始化信号量
#include semaphore.h //头文件
int sem_init(sem_t *sem, int pshared, unsigned int value);参数
pshared0表示线程间共享非零表示进程间共享。value信号量初始值。
2销毁信号量
#include semaphore.h //头文件
int sem_destroy(sem_t *sem);参数
sem要销毁的信号量。
3等待信号量
#include semaphore.h //头文件
int sem_wait(sem_t *sem); //P()功能等待信号量会将信号量的值减1 参数
sem要进行等待的信号量。 4发布信号量
#include semaphore.h //头文件
int sem_post(sem_t *sem); //V()功能发布信号量表示资源使用完毕可以归还资源了。将信号量值加1。
如上生产者-消费者的例子是基于queue的其空间可以动态分配如下基于固定大小的环形队列重写这个程序POSIX信号量
8.6 基于环形队列的生产消费模型
8.6.1 环形队列
环形队列的模型图如下生产者和消费者共用一个容器一开始两者从同一个地方出发生产者先放数据消费者跟在后面拿数据。为了保证这个行为可以持续我们需要遵守以下几个规则。 1消费者不能超过生产者 消费者取数据的前提就是所在位置中有数据。如果生产者只生产了三个数据但是消费者却已经走了四个位置这很显然是不合理的当所在位置无数据时消费者应该停下来。
所以我们采取的措施是设置空白位置的信号量blank_sem、数据个数的信号量data_sem。
信号量的本质就是一个计数器生产者要添加数据就需要空位置所以需要一个记录有多少空白位置的计数器消费者要减少数据就需要有数据所以需要一个有多少数据可供自己消费的计数器。可消费的数据个数为0时data_sem就会阻止消费者继续消费数据同时将消费者线程挂起 2生产者 - 消费者通过下标添加或者消费数据 容器的大小是有限的假设容量大小capacity 10因为这里是一个环形队列采用push_back尾插的方式会造成越界
所以这里引入下标 p_index、c_index分别表示生产者生产到哪个位置了、消费者消费到哪个位置了。
每当生产者添加一个数据p_index然后 p_index % capacity。 每当消费者消费一个数据c_index然后 c_index % capacity。 ps取模的目的是将下标控制在 0~capacity-1之间
3如下基本规则
生产者只关心是否还有格子用来生产数据。消费者只关心环形队列中是否还有数据。一开始没有数据生产者和消费者指向同一个位置这时生产者要先执行生产操作消费者阻塞挂起数据满时生产者和消费者也指向同一个位置这时消费者先执行消费操作再轮到生产者生产。生产者和消费者不能同时访问队列中的同一个位置。生产者和消费者可以并发访问环形队列中的不同位置。 8.6.2 环形队列的实现
1成员变量说明
这里用一个数组来模拟环形队列因为生产者和消费者要并发执行且不能同时操作相同位置的数据刚好数组可以通过下标随机访问数据所以这里我们选用数组。定义了两个无符号整型对象p_index和c_index分别指向生产者要生产数据的格子下标和消费者要拿取数据的位置下标。还定义了_proSem和_cusSem两个信号量对象分别记录着环形队列中格子数量和以生产数据个数。最后还有必要记录环形队列的容量大小可以用它来取模更新p_index和c_index的值。
1RingQueue.hpp
#pragma once
#include iostream
#include semaphore.h
#include vector
#include ctime
#include unistd.h
#include pthread.h
using namespace std;#define N 11
#define CUS 3
#define PRO 2templateclass T
class RingQueue
{
public:RingQueue(int capacity N) //构造函数:_v(capacity),_capacity(capacity),p_index(0),c_index(0){sem_init(_proSem, 0, N);sem_init(_cusSem, 0, 0);}void push(const T data) // 生产者生产数据{P(_proSem);lock(p_lock);_v[p_index] data;p_index;p_index % _capacity;unlock(p_lock);V(_cusSem);}T pop() // 消费者消费数据{P(_cusSem);lock(c_lock);T tmp _v[c_index];c_index;c_index % _capacity;unlock(c_lock);V(_proSem);return tmp;}~RingQueue() //析构函数{sem_destroy(_cusSem);sem_destroy(_proSem);}private: void lock(pthread_mutex_t mutex) //加锁{pthread_mutex_lock(mutex);}void unlock(pthread_mutex_t mutex) //解锁{pthread_mutex_unlock(mutex);}void P(sem_t s) // 申请信号量{ sem_wait(s); }void V(sem_t s) // 释放信号量{sem_post(s); }vectorT _v; // 循环队列sem_t _proSem; // 记录队列中空格数量的信号量sem_t _cusSem; // 记录队列中数据数量的信号量int p_index; // 记录当前空格所在下标 int c_index; // 记录当前数据所在下标int _capacity; // 记录环形队列容量pthread_mutex_t c_lock; // 消费者加锁pthread_mutex_t p_lock; // 生产者加锁
};成员函数说明
将信号量的PV操作进行了封装只需把信号量对象作为参数传入就能完成信号量的申请、释放操作。生产者执行Push()操作生产数据时需要先申请减一_proSem信号量生产完成后释放加一_cusPos信号量让消费者来消费。反之亦然
2单生产单消费
#include RingQueue.hppvoid* Customer(void* argc)
{RingQueueint* rq static_castRingQueueint*(argc);while(1){sleep(1);int data 0;data rq-pop();cout 消费者消费了一个数据: data endl;}
}void* Producer(void* argc)
{RingQueueint* rq static_castRingQueueint*(argc);while(1){int data rand() % N 1;rq-push(data);cout 生者生产了一个数据: data endl;}
}int main()
{srand(time(nullptr)); // 1、制造随机数种子作为生产者push到环形队列当中的数据RingQueueint* rq new RingQueueint(); // 2、new一个环形队列 // 3、分别创建、等待一个生产者和一个消费者 pthread_t tid1;pthread_t tid2;pthread_create(tid1, nullptr, Customer, (void*)rq); pthread_create(tid2, nullptr, Producer, (void*)rq);pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);// 4、最后delete环形队列 delete rq;return 0;
}3运行结果 4多生产多消费 因为循环队列的生产者和消费者的p_index和c_index只有一个这样在多生产多消费的情况下就会产生互斥所以需要加锁进行保护。
#include RingQueue.hpp
#include Task.hpp
struct ThreadData
{RingQueueTask *rq;std::string threadname;
};void* Customer(void* argc)
{ThreadData* td static_castThreadData*(argc);RingQueueTask* rq td-rq;string name td-threadname;while(1){sleep(1);Task t rq-pop();t();cout Consumer get task, task is : t.GetTask() who: name result: t.GetResult() endl;}
}void* Producer(void* argc)
{ThreadData* td static_castThreadData*(argc);RingQueueTask* rq td-rq;string name td-threadname;int len opers.size();while(1){int data1 rand() % 10 1;usleep(10);int data2 rand() % 10;char op opers[rand() % len];Task t(data1, data2, op);rq-push(t);cout Productor task done, task is : t.GetTask() who: name endl;}
}int main()
{srand(time(nullptr)); // 1、制造随机数种子作为生产者push到环形队列当中的数据RingQueueTask* rq new RingQueueTask; // 2、new一个环形队列 // 创建、等待多个生产者和多个消费者pthread_t custid[CUS];pthread_t protid[PRO];for(int i 0; i CUS; i){ThreadData* td new ThreadData();td-rq rq;td-threadname Consumer- std::to_string(i);pthread_create(custid i, nullptr, Customer, (void*)td);}for(int i 0; i PRO; i){ThreadData* td new ThreadData();td-rq rq;td-threadname Producer- std::to_string(i);pthread_create(protid i, nullptr, Producer, (void*)td);}for(int i 0; i CUS; i){pthread_join(custid[i], nullptr);}for(int i 0; i PRO; i){pthread_join(protid[i], nullptr);}// 4、最后delete环形队列 delete rq;return 0;
}5Task.hpp
#pragma once
#include iostream
#include stringstd::string opers-*/%;enum{DivZero1,ModZero,Unknown
};class Task
{
public:Task(){}Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0){}void run(){switch (oper_){case :result_ data1_ data2_;break;case -:result_ data1_ - data2_;break;case *:result_ data1_ * data2_;break;case /:{if(data2_ 0) exitcode_ DivZero;else result_ data1_ / data2_;}break;case %:{if(data2_ 0) exitcode_ ModZero;else result_ data1_ % data2_;} break;default:exitcode_ Unknown;break;}}void operator ()(){run();}std::string GetResult(){std::string r std::to_string(data1_);r ;r oper_;r ;r std::to_string(data2_);r ;r std::to_string(result_);r [code: ;r std::to_string(exitcode_);r ];return r;}std::string GetTask(){std::string r std::to_string(data1_);r ;r oper_;r ;r std::to_string(data2_);r ?;return r;}~Task(){}private:int data1_;int data2_;char oper_;int result_;int exitcode_;
};6运行结果 9. 线程池
9.1 基本概念
线程池thread pool一种线程使用模式线程过多会带来调度开销进而影响缓存局部性和整体性能。而线程池维护着多个线程等待着监督管理者分配可并发执行的任务。这避免了在短时间任务创建与销毁线程的代价。线程池不仅能够保证内核的充分利用还能防止过分调度。可用线程数据取决于可用的并发处理器、处理器内核、内存、网络sockets等数量。
9.2 线程池工作的四种情况
9.2.1 主程序当前没有任务要执行线程池中任务队列为空闲状态
下面情况下所有工作线程处于空闲的等待状态任务缓冲队列为空。
9.2.2 主程序添加小于等于线程池中线程数量得任务
基于10.2.1情况所有的工作线程已处于等待状态主线程开始添加三个任务添加后通知唤醒线程池中的线程开始取任务执行。此时的任务缓冲队列还是空。
9.2.3 主程序添加任务数量大于当前线程池中线程数量的任务
基于10.2.2 情况所有工作线程都在工作中主线程开始添加第四个任务添加后发现现在线程池中线程用完了于是存入任务缓冲队列。工作线程空闲后主动从任务队列取任务执行。
9.2.4 主程序添加任务数量大于当前线程池中线程数量的任务且任务缓冲队列已满
此情况发生情形3且设置了任务缓冲队列大小后面主程序添加第N个任务添加后发现线程池中线程已经用完了任务缓冲队列已满于是进入等待状态等待任务缓冲队列中任务腾空通知。但是这种情形会阻塞主线程本文不限制任务队列的大小必要时再优化。
9.3 线程池的实现 线程池的主要组成由三个部分构成
任务队列Task Quene线程池Thread Pool完成队列Completed Tasks
1ThreadPool.hpp 等待通知机制通过条件变量来实现。
#pragma once
#include iostream
#include vector
#include string
#include queue
#include pthread.h
#include unistd.h
#include ctime
using namespace std;#define N 5struct ThreadInfo
{pthread_t tid;std::string name;
};templateclass T
class ThreadPool
{
public:void lock() //加锁{pthread_mutex_lock(_mutex);}void unlock() //解锁{pthread_mutex_unlock(_mutex);}void Wakeup() //唤醒线程{pthread_cond_signal(_cond);}void ThreadSleep(){pthread_cond_wait(_cond, _mutex);}bool IsQueueEmpty(){return _qt.empty();}string GetThreadName(pthread_t tid){for(const auto ti : _v){if(ti.tid tid){return ti.name;}}return None;}ThreadPool(int num N) //构造函数:_v(N){pthread_mutex_init(_mutex, nullptr);pthread_cond_init(_cond, nullptr);}static void* HandlerTask(void* args){ThreadPoolT* tp static_castThreadPoolT*(args);string name tp-GetThreadName(pthread_self());while(1){tp-lock();while(tp-IsQueueEmpty()){tp-ThreadSleep();}T t tp-pop();tp-unlock();t();cout name run, result: t.GetResult() endl;}}void Start() //创建线程池{int num _v.size();for(int i 0; i N; i){_v[i].name thread- to_string(i 1);pthread_create((_v[i].tid), nullptr, HandlerTask, this);}}T pop() //从队列当中取出数据{T tmp _qt.front();_qt.pop();return tmp;}void Push(const T t) //往队列当中插入数据{lock();_qt.push(t);Wakeup();unlock();}~ThreadPool() //析构函数{pthread_mutex_destroy(_mutex);pthread_cond_destroy(_cond);}private:vectorThreadInfo _v; //线程池queueT _qt; //任务队列pthread_mutex_t _mutex; //锁pthread_cond_t _cond; //条件变量
};2test.cpp 线程池最重要的方法是负责向队列添加任务由主函数向队列添加任务。
#include Task.hpp
#include ThreadPool.hppint main()
{ThreadPoolTask* tp new ThreadPoolTask(5);tp-Start();srand(time(nullptr));while(1){//1. 构建任务int x rand() % 11 1;usleep(10);int y rand() % 5 1;char op opers[rand() % opers.size()];Task t(x, y, op);tp-Push(t);//2. 交给线程池处理std::cout main thread make task: t.GetTask() std::endl;sleep(1);}return 0;
}3运行结果 9.4 对线程进行简单的封装
#pragma once
#include iostream
#include pthread.h
#include vector
#include unistd.h
using namespace std;typedef void (*callback_t)();
static int num 1;class Thread
{
public:Thread(callback_t cb):_cb(cb),_start_timestamp(0),_isrunning(false),_name(){}static void* routine(void* args){Thread* t static_castThread*(args);t-Entery();return nullptr;}void run(){_isrunning true;_name thread- to_string(num);_start_timestamp time(nullptr);pthread_create(_tid, nullptr, routine, this);}void join(){pthread_join(_tid, nullptr);_isrunning false;}string name(){return _name;}uint64_t StartTimestamp(){return _start_timestamp;}bool IsRunning(){return _isrunning;}void Entery(){_cb();}
private:pthread_t _tid; //线程的tidstring _name; //线程名uint64_t _start_timestamp; //线程创建的时间bool _isrunning; //线程是否在运行callback_t _cb; //线程所要执行的函数
};callback_t 是函数指针是线程需要执行的函数
10. 线程安全的单例模式
10.1 单例模式的特点
某些类, 只应该具有一个对象实例化就称之为单例。 例如一个男人只能有一个媳妇。 在很多服务器开发场景中经常需要让服务器加载很多的数据 (上百G) 到内存中此时往往要用一个单例的类来管理这些数据。
10.2 饿汉实现方式和懒汉实现方式
1洗碗的例子
吃完饭立刻洗碗这种就是饿汉方式因为下一顿吃的时候可以立刻拿着碗就能吃饭。吃完饭先把碗放下然后下一顿饭用到这个碗了再洗碗就是懒汉方式。
懒汉方式最核心的思想是 “延时加载”从而能够优化服务器的启动速度。
2饿汉方式实现单例模式
template typename T
class Singleton
{static T data;
public:static T* GetInstance() {return data;}
};只要通过 Singleton 这个包装类来使用 T 对象则一个进程中只有一个T对象的实例。
3懒汉方式实现单例模式
template typename T
class Singleton
{static T* inst;
public:static T* GetInstance() {if (inst NULL) {inst new T();}return inst;}
};存在一个严重的问题线程不安全。 第一次调用 GetInstance 的时候如果两个线程同时调用可能会创建出两份 T 对象的实例。 但是后续再次调用就没有问题了。
10.3 懒汉方式实现单例模式将线程池该成单例
1ThreadPool.hpp
#pragma once
#include iostream
#include vector
#include string
#include queue
#include pthread.h
#include unistd.h
#include ctime
using namespace std;#define N 5struct ThreadInfo
{pthread_t tid;std::string name;
};templateclass T
class ThreadPool
{
public:void lock() //加锁{pthread_mutex_lock(_mutex);}void unlock() //解锁{pthread_mutex_unlock(_mutex);}void Wakeup() //唤醒线程{pthread_cond_signal(_cond);}void ThreadSleep(){pthread_cond_wait(_cond, _mutex);}bool IsQueueEmpty(){return _qt.empty();}string GetThreadName(pthread_t tid){for(const auto ti : _v){if(ti.tid tid){return ti.name;}}return None;}static void* HandlerTask(void* args){ThreadPoolT* tp static_castThreadPoolT*(args);string name tp-GetThreadName(pthread_self());while(1){tp-lock();while(tp-IsQueueEmpty()){tp-ThreadSleep();}T t tp-pop();tp-unlock();t();cout name run, result: t.GetResult() endl;}}void Start() //创建线程池{int num _v.size();for(int i 0; i N; i){_v[i].name thread- to_string(i 1);pthread_create((_v[i].tid), nullptr, HandlerTask, this);}}T pop() //从队列当中取出数据{T tmp _qt.front();_qt.pop();return tmp;}void Push(const T t) //往队列当中插入数据{lock();_qt.push(t);Wakeup();unlock();}static ThreadPoolT* GetInstance(){if(_tp nullptr) //因为if只会进去一次所以后面在加锁会影响效率所以加锁前在判断一次即可避免{pthread_mutex_lock(_lock);if (_tp nullptr){_tp new ThreadPoolT();}pthread_mutex_unlock(_lock);}return _tp;}private:ThreadPool(int num N) //构造函数:_v(N){pthread_mutex_init(_mutex, nullptr);pthread_cond_init(_cond, nullptr);}~ThreadPool() //析构函数{pthread_mutex_destroy(_mutex);pthread_cond_destroy(_cond);}ThreadPool(const ThreadPoolT tp) delete;const ThreadPoolT operator(const ThreadPoolT tp) delete; // abcvectorThreadInfo _v; //线程池queueT _qt; //任务队列pthread_mutex_t _mutex; //锁pthread_cond_t _cond; //条件变量static ThreadPoolT* _tp;static pthread_mutex_t _lock;
};template class T
ThreadPoolT* ThreadPoolT::_tp nullptr;template class T
pthread_mutex_t ThreadPoolT::_lock PTHREAD_MUTEX_INITIALIZER;2test.cpp
#include Task.hpp
#include ThreadPool.hppint main()
{ThreadPoolTask* tp ThreadPoolTask::GetInstance();//ThreadPoolTask* tp new ThreadPoolTask(5);tp-Start();srand(time(nullptr));while(1){//1. 构建任务int x rand() % 11 1;usleep(10);int y rand() % 5 1;char op opers[rand() % opers.size()];Task t(x, y, op);tp-Push(t);//2. 交给线程池处理std::cout main thread make task: t.GetTask() std::endl;sleep(1);}return 0;
}将四个函数私有化后利用静态函数GetInstance来获取唯一对象指针。
11. STL、智能指针的线程安全
1STL中的容器是否是线程安全的?
不是原因是STL 的设计初衷是将性能挖掘到极致而一旦涉及到加锁保证线程安全会对性能造成巨大的影响。而且对于不同的容器加锁方式的不同性能可能也不同(例如hash表的锁表和锁桶)。 因此 STL 默认不是线程安全如果需要在多线程环境下使用往往需要调用者自行保证线程安全。 2智能指针是否是线程安全的? 对于 unique_ptr由于只是在当前代码块范围内生效因此不涉及线程安全问题。 对于 shared_ptr多个对象需要共用一个引用计数变量所以会存在线程安全问题但是标准库实现的时候考虑到了这个问题基于原子操作(CAS)的方式保证 shared_ptr 能够高效原子的操作引用计数。
12. 其他常见的各种锁
悲观锁在每次取数据时总是担心数据会被其他线程修改所以会在取数据前先加锁读锁写锁行锁等当其他线程想要访问数据时被阻塞挂起。乐观锁每次取数据时候总是乐观的认为数据不会被其他线程修改因此不上锁。但是在更新数据前会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式版本号机制和CAS操作。CAS操作当需要更新数据时判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败失败则重试一般是一个自旋的过程即不断重试。自旋锁
为什么会有自旋锁
在编写多线程的时候有些公共数据读的概率远远大于修改的几率。通常而言在读的过程中往往伴随着查找的操作中间耗时很长。给这种代码段加锁会极大地降低我们程序的效率。我们引入了读写锁即自旋锁处理这种多读少写的情况。 什么是自旋锁 它把对共享资源的访问者划分成读者和写者读者只对共享资源进行读访问写者则需要对共享资源进行写操作。 这种锁相对于自旋锁而言能提高并发性。 在多处理器系统中 1对于读者它允许同时有多个读者来访问共享资源最大可能的读者数为实际的逻辑CPU数。 2 对于写者写者是排他性的一个读写锁同时只能有一个写者或多个读者与CPU数相关但不能同时既有读者又有写者。 自旋锁相关的API函数
使用自旋锁必须包含头文件并链接库-lpthread
#include pthread.h1初始化函数
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);功能初始化自旋锁 当线程使用该函数初始化一个未初始化或者被destroy过的自旋锁。该函数会为自旋锁申请资源并且初始化自旋锁为unlocked状态。 参数 pthread_spinlock_t 要初始化自旋锁 pshared取值 PTHREAD_PROCESS_SHARED该自旋锁可以在多个进程中的线程之间共享。可以被其他进程中的线程看到PTHREAD_PROCESS_PRIVATE: 仅初始化本自旋锁的线程所在的进程内的线程才能够使用该自旋锁。
返回值若成功返回0否则返回错误编号
2销毁函数
int pthread_spin_destroy(pthread_spinlock_t *lock);功能用来销毁指定的自旋锁并释放所有相关联的资源所谓的资源指的是由pthread_spin_init自动申请的资源如果调用该函数时自旋锁正在被使用或者自旋锁未被初始化则结果是未定义的。 参数
pthread_spinlock_t 要销毁的自旋锁
返回值若成功返回0否则返回错误编号
3加锁函数
int pthread_spin_lock(pthread_spinlock_t *lock);功能用来获取锁定指定的自旋锁. 如果该自旋锁当前没有被其它线程所持有则调用该函数的线程获得该自旋锁否则该函数在获得自旋锁之前不会返回。 参数
pthread_spinlock_t 要加锁的自旋锁
返回值若成功返回0否则返回错误编号
4解锁函数
int pthread_spin_unlock(pthread_spinlock_t *lock);功能用来解锁指定的自旋锁.。 参数
pthread_spinlock_t 要加锁的自旋锁
返回值若成功返回0否则返回错误编号
13. 读者写者问题
在编写多线程的时候有一种情况是十分常见的。那就是有些公共数据修改的机会比较少。相比较改写它们读的机会反而高的多。通常而言在读的过程中往往伴随着查找的操作中间耗时很长。如果给这种代码段加锁会极大地降低我们程序的效率。 那么有没有一种方法可以专门处理这种多读少写的情况呢 有那就是读写锁。
13.1 读者与写者的关系
为了解决我们的问题我们先来分析多线程中两个角色的关系读者写者。
写者与写者显然写者与写者之间的关系是互斥的多个写者之间要用锁来进行保证线程安全。读者与读者读者与读者都只是访问数据而不会修改数据所以多个读者访问数据没有线程安全读者之间毫无关系。读者与写者当读者与写者同时访问数据时明显有线程安全的问题只有读者读取完了再让写者写或者写者写完了再让读者读取才是合理的。所以读者与写者之间存在互斥且同步的关系。
13.2 读写锁的API函数
使用自旋锁必须包含头文件并链接库-lpthread
#include pthread.h1初始化/销毁函数
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);功能初始化/销毁一个读写锁。 参数
pthread_rwlock_t : 读写锁的数据结构pthread_rwlockattr_t : 读写锁属性的数据结构一般直接设置为空。
返回值若成功返回0否则返回错误编号
2读者加锁函数
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);功能当读者要访问数据时要申请先读者锁拿到读者锁以后才能访问资源。返回值若成功返回0否则返回错误编号
3写者加锁函数
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);功能当写者要访问数据时要申请先写者锁拿到写者锁以后才能访问资源。返回值若成功返回0否则返回错误编号
4解锁函数
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);功能无论是读者解锁还是写者解锁都要使用此函数。 返回值若成功返回0否则返回错误编号
13.3 伪代码理解读写锁的原理
想要使用正确的使用读写锁就还要简单理解一下读写锁的原理而读写锁就是要维护好上面的读者与写者的关系。 如下代码来理解读写锁的原理。 1读者的逻辑 2写者的逻辑 3简单分析
当读者先加锁则读者拿到阅览室临界区的使用权不影响后来其他读者只会拦截写者直到读者解锁。当写者先加锁后来的写者与读者都不能够进入直到写者解锁。实际中由于读者众多可能会有读者络绎不绝的进入阅览室临界区从而导致写者迟迟无法获得阅览室临界区的使用权进而导致写者饥饿问题又叫读者优先策略 注意写独占读共享读锁优先级高。
写者优先策略的实现方式当写者加锁没有成功在写者后面来的读者全部都不能够进入阅览室临界区等阅览室已经存在的读者全部离开以后写者先进入阅览室临界区进行修改然后才能让写者后面来的读者进入阅览室临界区。pthread库里面给我们提供了一个函数可以设置读写优先级使用man查不到这个函数但是可以在pthread.h头文件中发现如果我们不设置默认是读者优先。
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先目前有 BUG导致表现行为和PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先但写者不能递归加锁
*/13.4 读写锁的演示
#include iostream
#include sstream
#include string
#include vector
#include pthread.h
#include unistd.husing namespace std;// 线程的属性
struct ThreadAttr
{pthread_t _tid;string _name;
};// 票数
volatile int ticket 100;
// 读写锁
pthread_rwlock_t rwlock;void rwattr_init(pthread_rwlockattr_t* pattr, int flag) // 读写锁属性初始化
{pthread_rwlockattr_init(pattr); if (flag 0) // flag为0表示读者优先其他表示写着优先{pthread_rwlockattr_setkind_np(pattr, PTHREAD_RWLOCK_PREFER_READER_NP);}else{pthread_rwlockattr_setkind_np(pattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);}
}void rwattr_destroy(pthread_rwlockattr_t* prwlock) // 读写锁属性的销毁
{pthread_rwlockattr_destroy(prwlock);
}void rwlock_init(pthread_rwlock_t* prwlock, int flag 0) // 锁的初始化
{pthread_rwlockattr_t rwattr;rwattr_init(rwattr, flag);pthread_rwlock_init(prwlock, rwattr);rwattr_destroy(rwattr);
}const string create_writer_name(size_t i) // 创建name
{stringstream ssm(thread writer : , ios::in | ios::out | ios::ate);ssm i;return ssm.str();
}const string create_reader_name(size_t i)
{stringstream ssm(thread reader : , ios::in | ios::out | ios::ate);ssm i;return ssm.str();
}void* readerRoutine(void* args) // 读者历程
{string* ps static_caststring*(args);// 进行查票while (true){pthread_rwlock_rdlock(rwlock);if (ticket ! 0){cout *ps ticket number : ticket endl;}else{cout *ps done!!!!! endl;// 防止死锁pthread_rwlock_unlock(rwlock);break;}pthread_rwlock_unlock(rwlock);// 休眠0.1msusleep(100);}
}void* writerRoutine(void* args) // 写者历程
{string* ps static_caststring*(args);// 进行改票while (true){pthread_rwlock_wrlock(rwlock);if (ticket ! 0){cout *ps ticket number : --ticket endl;}else{cout *ps done!!!!! endl;// 防止死锁pthread_rwlock_unlock(rwlock);break;}pthread_rwlock_unlock(rwlock);// 休眠0.1msusleep(100);}
}void reader_init(vectorThreadAttr readers)
{int i 1;for (auto e : readers){e._name create_reader_name(i);pthread_create(e._tid, nullptr, readerRoutine, e._name);}
}void writer_init(vectorThreadAttr writers)
{int i 1;for (auto e : writers){e._name create_writer_name(i);pthread_create(e._tid, nullptr, writerRoutine, e._name);}
}void reader_join(const vectorThreadAttr readers)
{for (auto e : readers){pthread_join(e._tid, nullptr);}
}void writer_join(const vectorThreadAttr writers)
{for (auto e : writers){pthread_join(e._tid, nullptr);}
}int main()
{// 初始化锁并设置读写者优先属性rwlock_init(rwlock, 0);const int reader_count 30;const int writer_count 2;vectorThreadAttr readers(reader_count);vectorThreadAttr writers(writer_count);reader_init(readers);writer_init(writers);reader_join(readers);writer_join(writers);return 0;
}