河南送变电建设有限公司网站,广州科技有限公司,盐田区网站建设,网站建设现状调查研究参考项目
https://github.com/progschj/ThreadPool
源码分析
// 常规头文件保护宏, 避免重复 include
#ifndef THREAD_POOL_H
#define THREAD_POOL_H// 线程池, 存储线程对象;
#include vector// 任务队列, 双向都可操作队列, queue 不能删除首个元素
#include …参考项目
https://github.com/progschj/ThreadPool
源码分析
// 常规头文件保护宏, 避免重复 include
#ifndef THREAD_POOL_H
#define THREAD_POOL_H// 线程池, 存储线程对象;
#include vector// 任务队列, 双向都可操作队列, queue 不能删除首个元素
#include queue// 智能指针
#include memory// c11 线程对象
#include thread// 锁保护队列多线程任务添加, 删除的安全;
#include mutex// 条件变量用来 condition wait 和 notify; 即事件的通知和阻塞等待
#include condition_variable// future 用来获取更友好的封装任务(函数), 并获取返回值;
#include future// function 任务队列
#include functional// 非法场景抛出异常
#include stdexcept// 为什么不用模板? 因为 enqueue 是模板, 可以兼容几乎所有场景;
class ThreadPool {
public:// 设置线程池大小ThreadPool(size_t);// 添加函数, function 和 argstemplateclass F, class... Argsauto enqueue(F f, Args... args) - std::futuretypename std::result_ofF(Args...)::type;// 吸狗函数~ThreadPool();
private:// need to keep track of threads so we can join themstd::vector std::thread workers;// the task queuestd::queue std::functionvoid() tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for(size_t i 0;ithreads;i)// 添加任务, 用 emplace_back 的形势, 避免某些类型不支持拷贝;workers.emplace_back(// lambda 捕获 this 对象, 用于操作任务队列;[this]{// 死循环等待任务for(;;){// 任务获取 void() 类型 统一封装, 后面用 packaged_task 封装不同的std::functionvoid() task;{// 构建对象用于 conditionstd::unique_lockstd::mutex lock(this-queue_mutex);// 等待线程池停止, 或者有任务;this-condition.wait(lock, [this]{ return this-stop || !this-tasks.empty(); });// 如果有任务 则 false, 即使停止也需要执行完任务之后再停止队列// 如果请求停止, 且没有任务则终止;if(this-stop this-tasks.empty())return;// 获取第一个任务;task std::move(this-tasks.front());this-tasks.pop();}// 执行获取到任务;task();}});
}// add new work item to the pool
templateclass F, class... Args
auto ThreadPool::enqueue(F f, Args... args) - std::futuretypename std::result_ofF(Args...)::type
{// 模板获取函数返回值类型using return_type typename std::result_ofF(Args...)::type;// 将对象通过 bind 打包成可调用对象, 并封装到 packeaged_task;auto task std::make_shared std::packaged_taskreturn_type() (std::bind(std::forwardF(f), std::forwardArgs(args)...));// 生成获取函数返回值的具柄对象;std::futurereturn_type res task-get_future();{// 加锁添加元素;std::unique_lockstd::mutex lock(queue_mutex);// dont allow enqueueing after stopping the poolif(stop)throw std::runtime_error(enqueue on stopped ThreadPool);// 用 lambda 再封装是为了统一函数格式 void()// task 是 shared_ptr, 避免拷贝;tasks.emplace([task](){ (*task)(); });}// 提醒阻塞线程有新的任务;condition.notify_one();// 返回获取函数返回值的具柄, 这样可以获取返回值;return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{// 加锁请求停止std::unique_lockstd::mutex lock(queue_mutex);stop true;}// 提醒所有阻塞线程, 需要停止线程池condition.notify_all();// join每个 thread 对象; 可以用 std::future 来作为 thread 对象, 让 std 管理生命周期;// 除非开发者有更加细致的管理, 如 优先级, 栈, 添加属性之类的操作;for(std::thread worker: workers)worker.join();
}#endif使用案例一
// create thread pool with 4 worker threads
ThreadPool pool(4);// enqueue and store future
auto result pool.enqueue([](int answer) { return answer; }, 42);// get result from future
std::cout result.get() std::endl;使用案例二
#include iostream
#include vector
#include chrono#include ThreadPool.hint main()
{ThreadPool pool(4);std::vector std::futureint results;for(int i 0; i 8; i) {results.emplace_back(pool.enqueue([i] {std::cout hello i std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout world i std::endl;return i*i;}));}for(auto result: results)std::cout result.get() ;std::cout std::endl;return 0;
}无注释版本
#ifndef THREAD_POOL_H
#define THREAD_POOL_H#include vector
#include queue
#include memory
#include thread
#include mutex
#include condition_variable
#include future
#include functional
#include stdexceptclass ThreadPool {
public:ThreadPool(size_t);templateclass F, class... Argsauto enqueue(F f, Args... args) - std::futuretypename std::result_ofF(Args...)::type;~ThreadPool();
private:// need to keep track of threads so we can join themstd::vector std::thread workers;// the task queuestd::queue std::functionvoid() tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for(size_t i 0;ithreads;i)workers.emplace_back([this]{for(;;){std::functionvoid() task;{std::unique_lockstd::mutex lock(this-queue_mutex);this-condition.wait(lock,[this]{ return this-stop || !this-tasks.empty(); });if(this-stop this-tasks.empty())return;task std::move(this-tasks.front());this-tasks.pop();}task();}});
}// add new work item to the pool
templateclass F, class... Args
auto ThreadPool::enqueue(F f, Args... args) - std::futuretypename std::result_ofF(Args...)::type
{using return_type typename std::result_ofF(Args...)::type;auto task std::make_shared std::packaged_taskreturn_type() (std::bind(std::forwardF(f), std::forwardArgs(args)...));std::futurereturn_type res task-get_future();{std::unique_lockstd::mutex lock(queue_mutex);// dont allow enqueueing after stopping the poolif(stop)throw std::runtime_error(enqueue on stopped ThreadPool);tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_lockstd::mutex lock(queue_mutex);stop true;}condition.notify_all();for(std::thread worker: workers)worker.join();
}#endif