国外网站的分析工具有哪些,办公室装修计入什么会计科目,网站建设的开发方法有哪些,WordPress古腾堡插件线程同步与互斥 一.线程互斥1.互斥相关概念2.互斥锁 Mutex3.互斥锁接口4.互斥锁实现原理5.互斥锁封装 二.线程同步1.同步相关概念2.条件变量 Condition Variable3.条件变量接口4.条件变量封装5.信号量 Semaphore6.信号量接口7.信号量封装8.生产者 - 消费者模型1.基于 Blocking … 线程同步与互斥 一.线程互斥1.互斥相关概念2.互斥锁 Mutex3.互斥锁接口4.互斥锁实现原理5.互斥锁封装 二.线程同步1.同步相关概念2.条件变量 Condition Variable3.条件变量接口4.条件变量封装5.信号量 Semaphore6.信号量接口7.信号量封装8.生产者 - 消费者模型1.基于 Blocking Queue 的生产者 - 消费者模型2.基于 Ring Queue 的生产者 - 消费者模型 三.线程池1.日志和策略模式 本节重点
深刻理解线程互斥的原理和操作。深刻理解线程同步。掌握生产消费模型。设计日志和线程池。理解线程安全和可重入掌握锁相关概念。
一.线程互斥
1.互斥相关概念
共享资源可以被多个进程或线程可以共同访问和使用的资源。临界资源被保护的共享资源它在同一时刻只允许一个进程或线程访问该资源。临界区每个线程内部访问临界资源的代码就叫做临界区。原子性不会被任何调度机制打断的操作该操作只有两态要么完成要么未完成。互斥在同一时刻只允许一个线程或进程访问共享资源。确保对共享资源的操作具有原子性避免多个线程或进程同时对共享资源进行读写操作而导致的数据竞争和不一致问题。
2.互斥锁 Mutex
大部分情况线程使用的数据都是局部变量变量的地址空间在线程栈空间内这种情况变量归属单个线程其它线程无法获得这种变量。但有时候很多变量都需要在线程间共享这样的变量称为共享变量可以通过数据的共享完成线程之间的交互。但是多个线程并发的操作共享变量存在数据安全问题。
#include iostream
#include vector
#include Pthread.hpp
using namespace ThreadModule;#define NUM 4int ticketnum 10000; // 共享资源void Ticket()
{while(true){if(ticketnum 0){usleep(1000);// 1.抢票std::cout get a new ticket, id: ticketnum std::endl;ticketnum--;// 2.入库模拟// usleep(1000);}else{break;}}
}int main()
{std::vectorThread threads;// 1.构建线程对象for(int i 0; i NUM; i){threads.emplace_back(Ticket);}// 2.启动线程for(auto thread : threads){thread.Start();}// 3.等待线程for(auto thread : threads){thread.Join();}return 0;
}xzyhcss-ecs-b3aa:~$ ./ticket
get a new ticket, id: 10000
get a new ticket, id: 9999
...
get a new ticket, id: 2
get a new ticket, id: 1
get a new ticket, id: 0
get a new ticket, id: -1
get a new ticket, id: -2为什么可能无法获得正确结果
if 语句判断条件为真以后代码可以并发的切换到其它线程。usleep(1000); 这个模拟漫长业务的过程在这个漫长的业务过程中可能有很多个线程会进入该代码段。ticketnum-- 操作本身就不是一个原子操作。
# 取出ticket--部分的汇编代码
xzyhcss-ecs-b3aa:~$ objdump -d ticket ticket.s
xzyhcss-ecs-b3aa:~$ vim ticket.s
25a3: 8b 05 6b 5a 00 00 mov 0x5a6b(%rip),%eax
25a9: 83 e8 01 sub $0x1,%eax
25ac: 89 05 62 5a 00 00 mov %eax,0x5a62(%rip)ticketnum-- 操作并不是原子操作而是对应三条汇编指令
load将共享变量 ticket 从内存加载到寄存器中。update更新寄存器里面的值执行-1操作。store将新值从寄存器写回共享变量 ticket 的内存地址。 要解决以上问题需要做到三点
代码必须要有互斥行为当代码进入临界区执行时不允许其它线程进入该临界区。如果多个线程同时要求执行临界区的代码并且临界区没有线程在执行那么只能允许一个线程进入该临界区。如果线程不在临界区中执行那么该线程不能阻止其它线程进入临界区。
要做到这三点本质上就是需要一把锁。Linux上提供的这把锁叫互斥锁(也叫互斥量) 所有对资源的保护都是对临界区代码的保护因为资源是通过代码访问的加锁一定不能大块代码进行加锁要保证细粒度锁本身是全局的也是共享资源。锁保护共享资源那么谁保证锁加锁和解锁被设计称为原子要么执行完要么未被执行不需要被保护二元信号量就是锁加锁的本质就是对资源展开预定整体使用资源如果申请锁的时候被其它线程拿走了其它线程要进行阻塞等待保证只能有一个线程访问资源线程在访问临界区代码时线程可以切换可以切换但是锁被线程拿走了其它线程无法进入临界区串行原子性效率低的原因
3.互斥锁接口
功能: 初始化互斥锁// 静态分配(编译确定内存的大小和位置): 不需要销毁
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;// 动态分配(运行确定内存的大小和位置): 需要销毁
原型: int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
参数mutex: 要初始化的互斥锁指针
参数attr: nullptr功能: 销毁互斥锁
原型: int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数mutex: 要销毁的互斥锁指针注意
使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥锁不需要销毁。不要销毁一个已经加锁的互斥锁。已经销毁的互斥锁要确保后面不会有线程再尝试加锁。
功能: 加锁, 其它线程不可以访问共享资源
原型: int pthread_mutex_lock(pthread_mutex_t *mutex);功能: 解锁, ,其它线程可以访问共享资源
原型: int pthread_mutex_unlock(pthread_mutex_t *mutex);调用 pthread_mutex_lock 时可能会遇到以下情况
互斥锁处于未锁状态该函数会将互斥锁锁定同时返回成功。发起函数调用时其它线程已经锁定互斥锁或者存在其它线程同时申请互斥量但没有竞争到互斥锁那么 pthread_mutex_lock 调用会陷入阻塞(执行流被挂起调度其它线程)等待互斥锁解锁。
// Pthread.hpp 在上一篇博客线程概念与控制, 模版封装线程库中// ticket.cc
#include iostream
#include vector
#include string
#include Pthread.hpp
using namespace ThreadModule;#define NUM 4// pthread_mutex_t lock PTHREAD_MUTEX_INITIALIZER; // 锁也是共享资源
int ticketnum 1000; // 共享资源class ThreadData
{
public:std::string name;pthread_mutex_t *lock_ptr;
};void Ticket(ThreadData td)
{while (true){// pthread_mutex_lock(lock)pthread_mutex_lock(td.lock_ptr); // 加锁if (ticketnum 0){usleep(1000);// 1.抢票std::cout td.name get a new ticket, id: ticketnum std::endl;ticketnum--;// pthread_mutex_unlock(lock)pthread_mutex_unlock(td.lock_ptr); // 解锁// 2.入库模拟: 耗时, 防止该线程再次抢锁(访问资源)usleep(50);}else{// pthread_mutex_unlock(lock)pthread_mutex_unlock(td.lock_ptr); // 解锁break;}}
}int main()
{pthread_mutex_t lock;pthread_mutex_init(lock, nullptr);std::vectorThreadThreadData threads;// 1.构建线程对象for (int i 0; i NUM; i){ThreadData* td new ThreadData();td-lock_ptr lock;threads.emplace_back(Ticket, *td);td-name threads[i].Name();}// 2.启动线程for (auto thread : threads){thread.Start();}// 3.等待线程for (auto thread : threads){thread.Join();}pthread_mutex_destroy(lock);return 0;
}xzyhcss-ecs-b3aa:~$ ./ticket
Thread-1 get a new ticket, id: 1000
Thread-2 get a new ticket, id: 999
Thread-3 get a new ticket, id: 998
...
Thread-2 get a new ticket, id: 3
Thread-3 get a new ticket, id: 2
Thread-4 get a new ticket, id: 14.互斥锁实现原理
单纯的 i 或者 i 都不是原子的有可能会有数据安全问题。为了实现互斥锁操作大多数体系结构都提供了 swap 或 exchange 指令该指令的作用是把寄存器和内存单元的数据相交换由于只有一条指令保证了原子性即使是多处理器平台访问内存的总线周期也有先后一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。现在我们把 lock 和 unlock 的伪代码改一下。 5.互斥锁封装
RAII 的核心思想是将资源的获取和初始化放在对象的构造函数中将资源的释放放在对象的析构函数中。实现RAII的加锁方式构造函数实现加锁析构函数实现解锁。
// Mutex.hpp
#pragma once#include pthread.hnamespace MutexModule
{class Mutex{// 互斥锁: 不支持拷贝构造、拷贝赋值Mutex(const Mutex m) delete;Mutex operator(const Mutex m) delete;public:Mutex(){::pthread_mutex_init(_mutex, nullptr);}~Mutex(){::pthread_mutex_destroy(_mutex);}pthread_mutex_t *LockAddr() { return _mutex; }void Lock(){::pthread_mutex_lock(_mutex);}void Unlock(){::pthread_mutex_unlock(_mutex);}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex mutex): _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex _mutex; // 使用引用: 互斥锁不支持拷贝};
}// Main.cc
#include string
#include unistd.h
#include Mutex.hpp
using namespace MutexModule;int ticket 1000;
Mutex mtx;void *Ticket(void* args)
{std::string name static_castchar*(args);while(true){// mtx.Lock();LockGuard lg(mtx); // 临时对象: 初始化时自动加锁, 出while循环时自动解锁(RAII风格的加锁方式)if(ticket 0){usleep(1000);std::cout name buys a ticket: ticket std::endl;ticket--;// mtx.Unlock();}else{// mtx.Unlock();break;}}return nullptr;
}int main()
{pthread_t t1, t2, t3;pthread_create(t1, nullptr, Ticket, (void*)thread-1);pthread_create(t2, nullptr, Ticket, (void*)thread-2);pthread_create(t3, nullptr, Ticket, (void*)thread-3);pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}xzyhcss-ecs-b3aa:~$ ./ticket
thread-3 buys a ticket: 1000
thread-3 buys a ticket: 999
thread-3 buys a ticket: 998
...
thread-3 buys a ticket: 3
thread-3 buys a ticket: 2
thread-3 buys a ticket: 1二.线程同步
1.同步相关概念
在互斥的代码中发现同一个线程多次访问资源导致其它进程迟迟访问不到资源导致进程饥饿问题。
同步在保证数据安全的前提下让线程能够按照某种特定的顺序访问临界资源从而有效避免线程饥饿问题叫做同步。竞态条件多个线程或进程对共享资源的访问顺序和时间不确定而导致程序异常。互斥保证在同一时间只有一个线程/进程访问共享资源进而保证数据安全性但是安全不一定合理/高效。同步是在互斥的前提下让系统变得更加合理/高效。如何做到线程同步条件变量
2.条件变量 Condition Variable
例如互斥的三个线程买票时第一个抢到锁的线程发现没有票时它会不断地加锁、什么都做不了、解锁(解锁后最接近加锁条件)导致其它线程处于饥饿状态。当主线程发票之后也是该线程抢到票。
#include iostream
#include unistd.h
#include pthread.hint ticket 0;
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;void *route(void *args)
{std::string name static_castchar *(args);while (1){pthread_mutex_lock(mutex);if (ticket 0){usleep(1000); // 微秒std::cout name buys a new ticket: ticket std::endl;ticket--;pthread_mutex_unlock(mutex);}else{std::cout name do nothing std::endl;sleep(1);pthread_mutex_unlock(mutex);}}return nullptr;
}int main()
{pthread_t t1, t2, t3;pthread_create(t1, nullptr, route, (void *)thread 1);pthread_create(t2, nullptr, route, (void *)thread 2);pthread_create(t3, nullptr, route, (void *)thread 3);int cnt 3;while(true){sleep(5);ticket cnt;std::cout 主线程发票啦, ticket: ticket std::endl;}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}xzyhcss-ecs-b3aa:~$ ./testCond
thread 1 do nothing
thread 1 do nothing
thread 1 do nothing
thread 1 do nothing
thread 1 do nothing
主线程发票啦, ticket: 3
thread 1 buys a new ticket: 3
thread 1 buys a new ticket: 2
thread 1 buys a new ticket: 1条件变量通常与互斥锁一起使用。互斥锁用于保护共享资源防止多个线程同时访问和修改这些资源而导致数据不一致而条件变量则用于在线程之间传递状态信息使得线程可以根据特定条件的满足与否来决定是继续执行还是等待。条件变量内部维护的是线程队列实现线程同步
3.条件变量接口
功能: 初始化条件变量// 静态分配(编译确定内存的大小和位置): 不需要销毁
pthread_cond_t cond PTHREAD_COND_INITIALIZER;// 动态分配(运行确定内存的大小和位置): 需要销毁
原型: int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
参数cond: 要初始化的条件变量指针
参数attr: nullptr功能: 销毁条件变量
原型: int pthread_cond_destroy(pthread_cond_t *cond);
参数cond: 要销毁的条件变量指针功能: 让当前线程在指定的条件变量上阻塞等待, 直到其它线程通过线程发送信号/广播, 解除线程阻塞, 再在锁上等待, 直到申请锁成功
原型: int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);工作原理
当线程调用 pthread_cond_wait() 时它会自动释放传入的互斥锁 mutex这是为了让其它线程有机会获取该互斥锁进而修改共享资源使得等待的条件有可能得到满足。线程进入阻塞状态等待在条件变量 cond 上此时线程不会占用 CPU 资源。当其它线程调用 pthread_cond_signal() 或 pthread_cond_broadcast() 对同一个条件变量 cond 发出信号时该线程被唤醒。线程被唤醒后会尝试重新获取之前释放的互斥锁 mutex。若互斥锁当前被其它线程占用该线程会继续阻塞直至成功获取互斥锁。一旦获取到锁线程就会从 pthread_cond_wait() 函数返回继续执行后续代码。
功能: 该函数用于向指定的条件变量cond发出信号, 唤醒一个正在该条件变量上等待的线程
原型: int pthread_cond_signal(pthread_cond_t *cond);功能: 该函数用于向指定的条件变量cond发出广播信号, 唤醒所有正在该条件变量上等待的线程
原型: int pthread_cond_broadcast(pthread_cond_t *cond);注意pthread_cond_signal() 和 pthread_cond_broadcast() 不会自动释放互斥锁调用该函数的线程仍然持有互斥锁。调用后通常需要手动释放互斥锁被唤醒的多个线程会竞争获取互斥锁获取到锁的线程才能继续执行。
#include iostream
#include unistd.h
#include pthread.hpthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond PTHREAD_COND_INITIALIZER;void *active(void *args)
{std::string name static_castchar *(args);while (true){pthread_mutex_lock(mutex);// 没有对资源释放就绪的判定// std::cout name is waiting std::endl;pthread_cond_wait(cond, mutex); // mutex???std::cout name is active std::endl;pthread_mutex_unlock(mutex);}
}int main()
{pthread_t tid1, tid2, tid3;pthread_create(tid1, nullptr, active, (void *)thread-1);pthread_create(tid2, nullptr, active, (void *)thread-2);pthread_create(tid3, nullptr, active, (void *)thread-3);sleep(1);std::cout main thread ctrl begin... std::endl;while (true){std::cout main wakeup thread... std::endl;pthread_cond_signal(cond);// pthread_cond_broadcast(cond);sleep(1);}pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);return 0;
}# 按照线程1、2、3的顺序, 实现同步
xzyhcss-ecs-b3aa:~$ ./testCond
main thread ctrl begin...
main wakeup thread...
thread-1 is active
main wakeup thread...
thread-2 is active
main wakeup thread...
thread-3 is active
main wakeup thread...
thread-1 is active
main wakeup thread...
thread-2 is active
main wakeup thread...
thread-3 is active#include iostream
#include unistd.h
#include pthread.hint ticket 0;
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond PTHREAD_COND_INITIALIZER;void *route(void *args)
{std::string name static_castchar *(args);while (1){pthread_mutex_lock(mutex);if (ticket 0){usleep(1000);std::cout name buys a new ticket: ticket std::endl;ticket--;pthread_mutex_unlock(mutex);}else{pthread_cond_wait(cond, mutex);std::cout 主线程出票完成, name 醒来 std::endl;pthread_mutex_unlock(mutex);}// usleep(50);}return nullptr;
}int main()
{pthread_t t1, t2, t3;pthread_create(t1, nullptr, route, (void *)thread 1);pthread_create(t2, nullptr, route, (void *)thread 2);pthread_create(t3, nullptr, route, (void *)thread 3);int cnt 10;while(true){sleep(5);ticket cnt;std::cout 主线程发票啦, ticket: ticket std::endl;pthread_cond_signal(cond);// pthread_cond_broadcast(cond);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}# 按照线程1、2、3的顺序依次抢票, 实现同步
xzyhcss-ecs-b3aa:~$ ./testCond
主线程发票啦, ticket: 3
主线程出票完成, thread 1 醒来
thread 1 buys a new ticket: 3
thread 1 buys a new ticket: 2
thread 1 buys a new ticket: 1
主线程发票啦, ticket: 3
主线程出票完成, thread 2 醒来
thread 2 buys a new ticket: 3
thread 2 buys a new ticket: 2
thread 2 buys a new ticket: 1
主线程发票啦, ticket: 3
主线程出票完成, thread 3 醒来
thread 3 buys a new ticket: 3
thread 3 buys a new ticket: 2
thread 3 buys a new ticket: 14.条件变量封装
// Cond.hpp
#pragma once#include pthread.h
#include Mutex.hpp
using namespace MutexModule;namespace CondModule
{class Cond{public:Cond() {::pthread_cond_init(_cond, nullptr);}~Cond() {::pthread_cond_destroy(_cond);}void Wait(Mutex mutex) // 线程释放曾经持有的锁, 不能拷贝{::pthread_cond_wait(_cond, mutex.LockAddr());}void Signal(){::pthread_cond_signal(_cond);}void Broadcast(){::pthread_cond_broadcast(_cond);}private:pthread_cond_t _cond;};
}5.信号量 Semaphore
信号量一种用于多进程或多线程环境下实现同步与互斥的机制。避免多个进程或线程同时访问共享资源而引发的数据不一致或其他错误。
SystemV 信号量涉及内核系统调用的开销较大性能可能会受到影响并且操作复杂。POSIX信号量接口简洁设计上更注重性能尤其是对于线程间的同步和互斥操作。
信号量本质上是一个计数器用于记录系统中某种资源的可用数量也就是最多允许线程进入共享资源的数量。配合PV两个原子操作来控制对共享资源的访问。PV 原子操作
P 操做如果信号量的值大于 0将信号量的值减 1然后继续执行如果信号量的值为 0则调用线程会被阻塞直到信号量的值大于 0。V 操作将信号量的值加 1。如果有其它线程正在等待该信号量那么会唤醒其中一个等待的线程。
6.信号量接口
功能: 初始化信号量
原型: int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:sem: 信号量对象的指针pshared: 0表示线程间共享, 非0表示进程间共享value: 信号量的初始值功能: 销毁化信号量
原型: int sem_destroy(sem_t *sem);功能: P操作
原型: int sem_wait(sem_t *sem);功能: V操作
原型: int sem_post(sem_t *sem);二元信号量可用的资源只有一份只允许一个线程访问共享资源类似互斥锁。当信号量的值为 1 时表示资源可用当值为 0 时表示资源已被占用。如下用二元信号量写法代替互斥锁
#include iostream
#include unistd.h
#include semaphore.hint ticket 1000;
sem_t sem;void* Ticket(void *args)
{std::string name static_castchar*(args);while(true){sem_wait(sem); // 申请信号量if(ticket 0){usleep(1000);std::cout name buys a ticket: ticket std::endl;ticket--;sem_post(sem); // 释放信号量}else{sem_post(sem); // 释放信号量break;}}return nullptr;
}int main()
{sem_init(sem, 0, 1);pthread_t t1, t2, t3;pthread_create(t1, nullptr, Ticket, (void*)thread-1);pthread_create(t2, nullptr, Ticket, (void*)thread-2);pthread_create(t3, nullptr, Ticket, (void*)thread-3);pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);sem_destroy(sem);return 0;
}xzyhcss-ecs-b3aa:~$ ./ticket
thread-1 buys a ticket: 1000
thread-1 buys a ticket: 999
thread-1 buys a ticket: 998
...
thread-1 buys a ticket: 3
thread-1 buys a ticket: 2
thread-1 buys a ticket: 1当允许最多3个线程并发访问共享资源时如何确保数据安全问题加锁
#include iostream
#include unistd.h
#include semaphore.h
#include Mutex.hppint ticket 1000;
sem_t sem;
Mutex mutex;void* Ticket(void *args)
{std::string name static_castchar*(args);while(true){sem_wait(sem); // 申请信号量LockGuard lockguard(mutex); // RAII方式加锁if(ticket 0){usleep(1000);std::cout name buys a ticket: ticket std::endl;ticket--;sem_post(sem); // 释放信号量}else{sem_post(sem); // 释放信号量break;}}return nullptr;
}int main()
{sem_init(sem, 0, 3); // 允许最多3个线程访问共享资源pthread_t t1, t2, t3;pthread_create(t1, nullptr, Ticket, (void*)thread-1);pthread_create(t2, nullptr, Ticket, (void*)thread-2);pthread_create(t3, nullptr, Ticket, (void*)thread-3);pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);sem_destroy(sem);return 0;
}xzyhcss-ecs-b3aa:~$ ./ticket
thread-1 buys a ticket: 1000
thread-1 buys a ticket: 999
thread-1 buys a ticket: 998
...
thread-1 buys a ticket: 3
thread-1 buys a ticket: 2
thread-1 buys a ticket: 17.信号量封装
// Sem.hpp
#pragma once
#include semaphore.h
namespace SemMoudel
{class Sem{public:Sem(int value 1): _value(value){::sem_init(_sem, 0, _value);}~Sem(){::sem_destroy(_sem);}void P(){::sem_wait(_sem);}void V(){::sem_post(_sem);}private:sem_t _sem;int _value;};
}8.生产者 - 消费者模型
单线程通常是串行执行多线程通常是单核CPU并发、多核CPU并行。并发比串行效率高并行比并发效率高。多线程中的某一个线程在执行IO操作时线程挂起会释放 CPU 资源允许操作系统将 CPU 时间片分配给其它就绪的线程也就是以并发的方式提高 CPU 效率。
生产者 - 消费者模型多线程或多进程协作设计模式用于解决生产者和消费者之间数据交互问题。生产者和消费者彼此之间不直接通讯而通过缓冲区来进行通讯。生产者首先检查缓冲区是否还有空闲空间如果有则生产一个数据项并将其放入缓冲区如果缓冲区已满生产者需要阻塞等待直到有消费者从缓冲区中取走数据腾出空间。消费者检查缓冲区是否有可用的数据项如果有则从缓冲区中取出一个数据项进行处理如果缓冲区为空消费者需要等待直到生产者向缓冲区中添加了新的数据项。这个缓冲区用来给生产者和消费者解耦的。
生产者 - 消费者模型效率高的原因
解耦生产者和消费者不需要直接交互它们只需要与缓冲区进行交互从而降低了两者之间的耦合度使得系统的可维护性和可扩展性得到提高。支持并发但临界区需要同步互斥防止并发造成数据不一致问题缓存机制平衡了生产和消费的速度差异。
总结
一个交易场所临界资源。两个角色生产者和消费者。三种关系生产者与生产者互斥关系。消费者与消费者互斥关系。生产者与消费者互斥和同步关系(生产者需要等待缓冲区有空闲空间才能生产数据消费者需要等待缓冲区有数据才能消费) 1.基于 Blocking Queue 的生产者 - 消费者模型
阻塞队列一种常用于实现生产者 - 消费者模型的数据结构。其与普通的队列区别当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素。当队列满时往队列里存放元素的操作也会被阻塞直到有元素被从队列中取出。生产者和消费者存在同步关系。考虑使用 互斥锁 条件变量 实现阻塞队列 利用 pthread.h 线程库的 Mutex 和 Cond 实现基于 Blocking Queue 的生产者 - 消费者模型
// BlockQueue.hpp
#pragma once#include iostream
#include queue
#include pthread.hnamespace BlockQueueMoudel
{static int gcap 10;template class Tclass BlockQueue{private:bool IsFull() { return _q.size() _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap gcap): _cap(cap), _p_wait_num(0), _c_wait_num(0){pthread_mutex_init(_mutex, nullptr);pthread_cond_init(_producer_cond, nullptr);pthread_cond_init(_consumer_cond, nullptr);}~BlockQueue(){pthread_mutex_destroy(_mutex);pthread_cond_destroy(_producer_cond);pthread_cond_destroy(_consumer_cond);}// 生产者void Push(const T in){pthread_mutex_lock(_mutex);// 生产数据是有条件的, 容量不能为满while (IsFull()) // while替代if: 防止伪唤醒{std::cout 生产者进入等待... std::endl;_p_wait_num;// 生产者线程等待时, 需解锁让线程挂起, 调度消费者线程, 消费一个数据后,// 通知生产者线程条件满足(可以生产数据), 接着需要再阻塞等待加锁pthread_cond_wait(_producer_cond, _mutex); // 等待必须在临界区: IsFull访问了临界资源// wait完成后: 生产者线程被唤醒 重新申请并持有锁(仍在临界区)_p_wait_num--;std::cout 生产者已被唤醒... std::endl;}// IsFull()不满足 || 生产者线程被唤醒_q.push(in);// 肯定有数据: 若消费者线程在等待, 直接唤醒if (_c_wait_num){// std::cout 唤醒消费者 std::endl;pthread_cond_signal(_consumer_cond);}// 唤醒在解锁后也可以pthread_mutex_unlock(_mutex);}// 消费者void Pop(T *out){pthread_mutex_lock(_mutex);// 消费数据是有条件的, 容量不能为空while (IsEmpty()) // while替代if: 防止伪唤醒{std::cout 消费者进入等待... std::endl;_c_wait_num;// 消费者线程等待时, 需解锁让线程挂起, 调度生产者线程, 生产一个数据后,// 通知消费者线程条件满足(可以消费数据), 接着需要再阻塞等待加锁pthread_cond_wait(_consumer_cond, _mutex); // 等待必须在临界区: IsEmpty访问了临界资源// wait完成后: 消费者线程被唤醒 重新申请并持有锁(仍在临界区)_c_wait_num--;std::cout 消费者已被唤醒... std::endl;}// IsEmpty()不满足 || 消费者线程被唤醒*out _q.front();_q.pop();// 肯定有空间: 若生产者线程在等待, 直接唤醒if (_p_wait_num){// std::cout 唤醒生产者 std::endl;pthread_cond_signal(_producer_cond);}// 唤醒在解锁后也可以pthread_mutex_unlock(_mutex);}private:std::queueT _q; // 保存数据, 临界资源int _cap; // bq的最大容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _producer_cond; // 生产者条件变量pthread_cond_t _consumer_cond; // 消费者条件变量int _p_wait_num; // 生产者线程等待个数int _c_wait_num; // 消费者线程等待个数};
}// Main.cc
#include unistd.h
#include BlockQueue.hpp
using namespace BlockQueueMoudel;// 生产者
void *Producer(void *args)
{BlockQueueint *bq static_castBlockQueueint *(args);int data 10;while (true){// sleep(2);// 1.生产到bq队列中bq-Push(data);std::cout producer 生产了一个数据: data std::endl;// 2.更新下一个生产的数据data;}
}// 消费者
void *Consumer(void *args)
{BlockQueueint *bq static_castBlockQueueint *(args);while (true){sleep(2);// 1.从bq队列获取数据int data;bq-Pop(data);// 2.消费数据std::cout consumer 消费了一个数据: data std::endl;}
}int main()
{BlockQueueint *bq new BlockQueueint(5);// 单生产者、单消费者pthread_t p, c;pthread_create(p, nullptr, Producer, bq);pthread_create(c, nullptr, Consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);// 多生产者、多消费者// pthread_t p1, p2, p3, c1, c2;// pthread_create(p1, nullptr, Producer, bq);// pthread_create(p2, nullptr, Producer, bq);// pthread_create(p3, nullptr, Producer, bq);// pthread_create(c1, nullptr, Consumer, bq);// pthread_create(c2, nullptr, Consumer, bq);// pthread_join(p1, nullptr);// pthread_join(p2, nullptr);// pthread_join(p3, nullptr);// pthread_join(c1, nullptr);// pthread_join(c2, nullptr);delete bq;return 0;
}xzyhcss-ecs-b3aa:~$ ./bq
producer 生产了一个数据: 10
producer 生产了一个数据: 11
producer 生产了一个数据: 12
producer 生产了一个数据: 13
producer 生产了一个数据: 14
生产者进入等待...
consumer 消费了一个数据: 10
生产者已被唤醒...
producer 生产了一个数据: 15
生产者进入等待...
consumer 消费了一个数据: 11
生产者已被唤醒...
producer 生产了一个数据: 16
生产者进入等待...利用自己封装的 Mutex 和 Cond 实现基于 Blocking Queue 的生产者消费者模型
// BlockQueue.hpp
#pragma once#include iostream
#include queue
#include pthread.h
#include Mutex.hpp
#include Cond.hpp
using namespace MutexModule;
using namespace CondModule;namespace BlockQueueMoudel
{static int gcap 10;template class Tclass BlockQueue{private:bool IsFull() { return _q.size() _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap gcap): _cap(cap), _p_wait_num(0), _c_wait_num(0){}~BlockQueue() {}// 生产者void Push(const T in){LockGuard lockguard(_mutex);while (IsFull()){std::cout 生产者进入等待... std::endl;_p_wait_num;_producer_cond.Wait(_mutex);_p_wait_num--;std::cout 生产者已被唤醒... std::endl;}_q.push(in);if (_c_wait_num){// std::cout 唤醒消费者 std::endl;_consumer_cond.Signal();}}// 消费者void Pop(T *out){LockGuard lockguard(_mutex);while (IsEmpty()){std::cout 消费者进入等待... std::endl;_c_wait_num;_consumer_cond.Wait(_mutex);_c_wait_num--;std::cout 消费者已被唤醒... std::endl;}*out _q.front();_q.pop();if (_p_wait_num){// std::cout 唤醒生产者 std::endl;_producer_cond.Signal();}}private:std::queueT _q; // 保存数据, 临界资源int _cap; // bq的最大容量Mutex _mutex; // 互斥锁Cond _producer_cond; // 生产者条件变量Cond _consumer_cond; // 消费者条件变量int _p_wait_num; // 生产者线程等待个数int _c_wait_num; // 消费者线程等待个数};
}// Main.cc
#include iostream
#include vector
#include functional
#include unistd.h
#include pthread.h
#include BlockQueue.hppusing namespace BlockQueueMoudel;
using task_t std::functionvoid();std::vectortask_t tasks;
void Sql() { std::cout 我是一个数据库任务 std::endl; }
void UpLoad() { std::cout 我是一个上传任务 std::endl; }
void DownLoad() { std::cout 我是一个下载任务 std::endl; }// 生产者
void *Producer(void *args)
{BlockQueuetask_t *bq static_castBlockQueuetask_t *(args);int cnt 0;while (true){// sleep(2);// 1.从tasks数组中获取任务bq-Push(tasks[cnt % 3]);cnt;// 2.生产任务std::cout producer 生产了一个任务 std::endl;}
}// 消费者
void *Consumer(void *args)
{BlockQueuetask_t *bq static_castBlockQueuetask_t *(args);while (true){sleep(2);// 1.从bq队列中获取任务task_t t;bq-Pop(t);// 2.处理任务t();std::cout consumer 处理完成一个任务 std::endl;}
}int main()
{tasks.push_back(Sql);tasks.push_back(UpLoad);tasks.push_back(DownLoad);BlockQueuetask_t *bq new BlockQueuetask_t(5);// 单生产者、单消费者pthread_t p, c;pthread_create(p, nullptr, Producer, bq);pthread_create(c, nullptr, Consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete bq;return 0;
}xzyhcss-ecs-b3aa:~$ ./bq
producer 生产了一个任务
producer 生产了一个任务
producer 生产了一个任务
producer 生产了一个任务
producer 生产了一个任务
生产者进入等待...
我是一个数据库任务
consumer 处理完成一个任务
生产者已被唤醒...
producer 生产了一个任务
生产者进入等待...
我是一个上传任务
consumer 处理完成一个任务
生产者已被唤醒...
producer 生产了一个任务
生产者进入等待...
我是一个下载任务
consumer 处理完成一个任务
生产者已被唤醒...
producer 生产了一个任务
生产者进入等待...2.基于 Ring Queue 的生产者 - 消费者模型
环型队列实现数据高效生产和消费的经典设计模式用于解决多线程/多进程环境下生产者和消费者之间的数据共享与同步问题。生产者负责将数据放入循环队列而消费者则从队列中取出数据进行处理。为了确保线程安全和避免数据竞争需要使用同步机制来控制对队列的访问。当队列已满时生产者需要等待当队列为空时消费者需要等待。考虑使用 信号量 实现循环队列 通过生产者生产数据空间-1数据1消费者消费数据数据-1空间1。实现数据和空间的平衡
单生产者单消费者
当队列为空/满 时生产者和消费者访问同一个位置(资源)同步互斥当队列非空/满 时生产者和消费者访问不同的位置(资源)并发
多单生产者单消费者
生产者和生产者互斥关系消费者和消费者互斥关系。生产者和消费者同意满足上面的关系。
利用自己封装的 Mutex 和 Sem 实现基于 Ring Queue 的生产者消费者模型
// RingQueue.hpp
#pragma once#include iostream
#include vector
#include pthread.h
#include Sem.hpp
#include Mutex.hppusing namespace SemMoudel;
using namespace MutexModule;namespace RingQueueMoudel
{template class Tclass RingQueue{public:RingQueue(int cap): _rq(cap), _cap(cap), _p_pos(0), _c_pos(0), _data_sem(0), _space_sem(cap){}~RingQueue(){}// 生产者void Push(const T in){// 当队满: 阻塞, 直到消费者消费数据_space_sem.P(); // 申请空间{// 先申请信号量, 再申请锁: 此时信号量的申请是并行的, 效率高一点LockGuard lockguard(_p_mutex);_rq[_p_pos] in;_p_pos;_p_pos % _cap;}_data_sem.V(); // 释放数据}// 消费者void Pop(T *out){// 当队空: 阻塞, 直到生产者生产数据_data_sem.P(); // 申请数据{// 先申请信号量, 再申请锁: 此时信号量的申请是并行的, 效率高一点LockGuard lockguard(_c_mutex);*out _rq[_c_pos];_c_pos;_c_pos % _cap;}_space_sem.V(); // 释放空间}private:std::vectorT _rq; // 环型队列, 临界资源int _cap; // 最大容量int _p_pos; // 生产者位置int _c_pos; // 消费者位置Sem _data_sem; // 数据信号量Sem _space_sem; // 空间信号量Mutex _p_mutex; // 生产者的锁Mutex _c_mutex; // 消费者的锁};
}// Main.cc
#include iostream
#include pthread.h
#include unistd.h
#include RingQueue.hppusing namespace RingQueueMoudel;void *Producer(void *args)
{RingQueueint *rq static_castRingQueueint *(args);int data 0;while(true){// 1.获取数据// 2.生产数据rq-Push(data);std::cout producer 生产了一个数据 data std::endl;data;}
}void *Consumer(void *args)
{RingQueueint *rq static_castRingQueueint *(args);while(true){sleep(1);// 1.消费数据int data;rq-Pop(data);// 2.处理数据std::cout consumer 消费了一个数据 data std::endl;}
}int main()
{RingQueueint *rq new RingQueueint(5);pthread_t p, c;pthread_create(p, nullptr, Producer, rq);pthread_create(c, nullptr, Consumer, rq);pthread_join(p, nullptr);pthread_join(c, nullptr);// pthread_t p1, p2, c1, c2, c3;// pthread_create(p1, nullptr, Producer, rq);// pthread_create(p2, nullptr, Producer, rq);// pthread_create(c1, nullptr, Consumer, rq);// pthread_create(c2, nullptr, Consumer, rq);// pthread_create(c3, nullptr, Consumer, rq);// pthread_join(p1, nullptr);// pthread_join(p2, nullptr);// pthread_join(c1, nullptr);// pthread_join(c2, nullptr);// pthread_join(c3, nullptr);delete rq;return 0;
}xzyhcss-ecs-b3aa:~$ ./rq
producer 生产了一个数据0
producer 生产了一个数据1
producer 生产了一个数据2
producer 生产了一个数据3
producer 生产了一个数据4
consumer 消费了一个数据0
producer 生产了一个数据5
consumer 消费了一个数据1
producer 生产了一个数据6
consumer 消费了一个数据2
producer 生产了一个数据7三.线程池
在写线程池之前我们要做如下准备
准备线程的封装。准备锁和条件变量的封装。引入日志对线程进行封装。
1.日志和策略模式
日志记录系统和软件运行中发生事件的文件主要作用是监控运行状态、记录异常信息帮助快速定位问题并支持程序员进行问题修复。它是系统维护、故障排查和安全管理的重要工具。日志格式中的某些指标是必须有时间戳、日志等级、日志内容。存在几个指标是可选的文件名行号、进程线程相关id信息等。日志有现成的解决方案spdlog、glog、Boost.Log、Log4cxx等。日志位于 /var/log/ 路径下设计模式在软件开发过程中针对反复出现的问题所总结归纳出的通用解决方案。
策略模式
抽象策略类(基类)包含一个或多个纯虚函数用于声明具体策略类需要实现的接口。具体策略类(派生类)重写了抽象策略类中定义的接口每个具体策略类代表一个具体的接口。上下文类持有一个抽象策略类的指针/引用负责根据需要选择和使用具体的策略类。
抽象策略类的作用定义统一接口运行时多态提高代码的可维护性和可扩展性。
这里采用 设计模式 - 策略模式 来进行日志的设计我们想要的日志格式如下
[可读性很好的时间] [日志等级] [进程pid] [打印对应日志的文件名][行号] - 消息内容, 支持可变参数[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [9] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [10] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [11] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [12] - hello world// Log.hpp
#pragma once#include iostream
#include cstdio
#include string
#include filesystem // C17文件系统
#include fstream // 文件流
#include sstream // 字符串流
#include memory
#include unistd.h
#include time.h
#include Mutex.hppnamespace LogModule
{using namespace MutexModule;// 获取系统时间std::string CurrentTime(){time_t time_stamp ::time(nullptr); // 获取时间戳struct tm curr;localtime_r(time_stamp, curr); // 将时间戳转化为可读性强的信息char buffer[1024];snprintf(buffer, sizeof(buffer), %4d-%02d-%02d %02d:%02d:%02d,curr.tm_year 1900,curr.tm_mon 1,curr.tm_mday,curr.tm_hour,curr.tm_min,curr.tm_sec);return buffer;}// 日志文件: 默认路径和默认文件名const std::string defaultlogpath ./log/;const std::string defaultlogname log.txt;// 日志等级enum class LogLevel{DEBUG 1,INFO,WARNING,ERROR,FATAL};std::string Level2String(LogLevel level){switch (level){case LogLevel::DEBUG:return DEBUG;case LogLevel::INFO:return INFO;case LogLevel::WARNING:return WARNING;case LogLevel::ERROR:return ERROR;case LogLevel::FATAL:return FATAL;default:return NONE;}}// 3. 策略模式: 刷新策略class LogStrategy{public:virtual ~LogStrategy() default; //???// 纯虚函数: 无法实例化对象, 派生类可以重载该函数, 实现不同的刷新方式virtual void SyncLog(const std::string message) 0;};// 3.1 控制台策略class ConsoleLogStrategy : public LogStrategy{public:ConsoleLogStrategy() {}~ConsoleLogStrategy() {}void SyncLog(const std::string message) override{LockGuard lockguard(_mutex);std::cout message std::endl;}private:Mutex _mutex;};// 3.2 文件级(磁盘)策略class FileLogStrategy : public LogStrategy{public:FileLogStrategy(const std::string logpath defaultlogpath, const std::string logname defaultlogname): _logpath(logpath), _logname(logname){// 判断_logpath目录是否存在if (std::filesystem::exists(_logpath)){return;}try{std::filesystem::create_directories(_logpath);}catch (std::filesystem::filesystem_error e){std::cerr e.what() \n;}}~FileLogStrategy() {}void SyncLog(const std::string message) override{LockGuard lockguard(_mutex);std::string log _logpath _logname;std::ofstream out(log, std::ios::app); // 以追加的方式打开文件if (!out.is_open()){return;}out message \n; // 将信息刷新到out流中out.close();}private:std::string _logpath;std::string _logname;Mutex _mutex;};// 4. 日志类: 构建日志字符串, 根据策略进行刷新class Logger{public:Logger(){// 默认往控制台上刷新_strategy std::make_sharedConsoleLogStrategy();}~Logger() {}void EnableConsoleLog(){_strategy std::make_sharedConsoleLogStrategy();}void EnableFileLog(){_strategy std::make_sharedFileLogStrategy();}// 内部类: 记录完整的日志信息class LogMessage{public:LogMessage(LogLevel level, const std::string filename, int line, Logger logger): _currtime(CurrentTime()), _level(level), _pid(::getpid()), _filename(filename), _line(line), _logger(logger){std::stringstream ssbuffer;ssbuffer [ _currtime ] [ Level2String(_level) ] [ _pid ] [ _filename ] [ _line ] - ;_loginfo ssbuffer.str();}~LogMessage(){if(_logger._strategy){_logger._strategy-SyncLog(_loginfo);}}template class TLogMessage operator(const T info){std::stringstream ssbuffer;ssbuffer info;_loginfo ssbuffer.str();return *this;}private:std::string _currtime; // 当前日志时间LogLevel _level; // 日志水平pid_t _pid; // 进程pidstd::string _filename; // 文件名uint32_t _line; // 日志行号Logger _logger; // 负责根据不同的策略进行刷新std::string _loginfo; // 日志信息};// 故意拷贝, 形成LogMessage临时对象, 后续在被时会被持续引用,// 直到完成输入才会自动析构临时LogMessage, 至此完成了日志的刷新,// 同时形成的临时对象内包含独立日志数据, 未来采用宏替换, 获取文件名和代码行数LogMessage operator()(LogLevel level, const std::string filename, int line){return LogMessage(level, filename, line, *this);}private:// 纯虚类不能实例化对象, 但是可以定义指针std::shared_ptrLogStrategy _strategy; // 日志刷新策略方案};// 定义全局logger对象Logger logger;// 编译时进行宏替换: 方便随时获取行号和文件名
#define LOG(level) logger(level, __FILE__, __LINE__)// 提供选择使用何种日志策略的方法
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}// Main.cc
#include iostream
#include Log.hpp
using namespace LogModule;int main()
{// 往显示器中写入ENABLE_CONSOLE_LOG();LOG(LogLevel::DEBUG) hello world;LOG(LogLevel::DEBUG) hello world;LOG(LogLevel::DEBUG) hello world;LOG(LogLevel::DEBUG) hello world;// 往文件中写入ENABLE_FILE_LOG();LOG(LogLevel::DEBUG) hello world;LOG(LogLevel::DEBUG) hello world;LOG(LogLevel::DEBUG) hello world;LOG(LogLevel::DEBUG) hello world;return 0;
}xzyhcss-ecs-b3aa:~$ ./testLog
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [9] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [10] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [11] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [12] - hello world