关键词网站建设公司,长沙做个网站多少钱,wordpress会员提成插件,吉林省吉林市有几个区一.项目介绍
本项目——高并发内存池#xff0c;是通过学习并模仿简化 google 的一个开源项目 tcmalloc #xff0c;全称 Thread-Caching Malloc#xff0c;即线程缓存的malloc#xff0c;模拟实现了一个自己的高并发内存池#xff0c;用于高效的多线程内存管理#xff…一.项目介绍
本项目——高并发内存池是通过学习并模仿简化 google 的一个开源项目 tcmalloc 全称 Thread-Caching Malloc即线程缓存的malloc模拟实现了一个自己的高并发内存池用于高效的多线程内存管理替代系统的内存分配相关的函数malloc、free。 二.了解内存池
1.池化技术
所谓“池化技术”就是在计算机中模仿蓄水池通过蓄水池我们不再需要一次一次的打水而是一次性积攒大量的水来供我们使用。
无论是现实中一次一次的打水还是计算机中一次又一次的申请资源都是需要很大的开销的所以我们提前申请好一大批资源在使用时调用使用后返还这样就能大大提高程序运行效率。 2.内存池
内存池即程序预先从操作系统申请一块足够大内存此后当程序中需要申请内存的时候不是直接向操作系统申请而是直接从内存池中获取同理当程序释放内存的时候并不真正将内存返回给操作系统而是返回内存池。当程序退出(或者特定时间)时内存池才将之前申请的内存真正释放。
本项目所做的高并发内存池出了解决效率问题之外还解决内存碎片的问题。
何为内存碎片 在这块地址空间中先是由四个部分分别占用并占满整个空间随后 vector 和 list 释放了空间此时该地址空间剩余了384Byte的空间但是此时如果申请了大于256Byte的空间能成功吗
答案是不能因为申请空间必须是连续的很明显上述的两块空间并不连续这时候这两块空间就体现出了碎片化的问题。这种碎片化称为外碎片。 3.malloc
在 C/C 中我们要动态申请内存都是通过malloc去申请内存但是我们要知道实际我们不是直接去堆获取内存的而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时再根据实际需求向操作系统“进货”。 三.整体框架设计
现代很多的开发环境都是多核多线程在申请内存的场景下必然存在激烈的锁竞争问题。
所以高并发内存池除了解决性能问题和内存碎片问题之外还解决多线程下锁竞争问题。
高并发内存池主要由以下三部分构成 thread cache线程缓存是每个线程独有的用于小于256KB的内存的分配线程从这里申请内存不需要加锁每个线程独享一个cache这就是这个并发线程池高效的地方。 central cache中心缓存是所有线程所共享thread cache是按需从central cache中获取的对象。central cache会回收thread cache中的对象避免一个线程占用了太多的内存而其他线程的内存吃紧达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的所以从这里取内存对象是需要加锁首先这里用的是桶锁其次只有thread cache没有内存对象时才会找central cache所以这里竞争不会很激烈。 page cache页缓存是在central cache缓存更下一层的缓存存储的内存是以页为单位存储及分配的当central cache没有内存对象时从page cache分配出一定数量的page并切割成定长大小的小块内存分配给central cache。当一个span的几个跨度页的对象都回收以后page cache 会回收central cache满足条件的span对象并且合并相邻的页组成更大的页缓解内存碎片的问题。
也就是说高并发内存池是由三部分缓存空间共同维护从而达到高性能的内存分配。 1.thread cache整体设计
thread cache是哈希桶结构每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象这样每个线程在这里获取对象和释放对象时是无锁的。 thread cache基础结构
class ThreadCache
{
public:private:FreeList _freelists[NFREELISTS];//存储每个自由链表的哈希桶
};// TLS thread local storage 线程自己的静态变量
static _declspec(thread) ThreadCache* pTLSThreadCache nullptr; 由于 thread cache 未来要被线程调用但是多个线程在一个进程中会共享资源这与我们设计的每个线程拥有自己独立的 thread cache 不合所以我们通过TLS创建每个线程独立的 thread cache。
自由链表的基础结构
//获取链表节点的下一个节点
static void* NextObj(void* obj)
{return *(void**)obj;
}//用于管理切分好的内存块的自由链表
class FreeList
{
public://获取内存块void Push(void* obj){assert(obj);//头插//*(void**)obj _freelist;NextObj(obj) _freelist;_freelist obj;}//分发内存块void* Pop(){assert(_freelist);//头删void* obj _freelist;_freelist NextObj(obj);return obj;}//判空bool Empty(){return _freelist nullptr;}private:void* _freelist nullptr;
};
链表中的每个节点的通过头几个字节构成 void* 指针连接的。 既然是哈希桶那桶里必然会有很多的自由链表那么这么多的自由链表当内存空间被获取时该选择从哪个自由链表中获取呢
此处我们引入“内碎片”的概念所谓内碎片就是我分给你了一段空间但是这段空间你不一定能完全利用会有部分空间剩余这些剩余的空间就称为“内碎片”。
相较于“外碎片”前者是整个内存空间的剩余而“内碎片”则是已经借出去的空间的剩余。
为了控制内碎片的浪费我们需要根据需要的空间大小来合理划分出哈希桶中自由链表的个数即其分别连接多大空间的块内存。 // 整体控制在最多10%左右的内碎片浪费 // [1,128] 8byte对齐 freelist[0,16) // [1281,1024] 16byte对齐 freelist[16,72) // [10241,8*1024] 128byte对齐 freelist[72,128) // [8*10241,64*1024] 1024byte对齐 freelist[128,184) // [64*10241,256*1024] 8*1024byte对齐 freelist[184,208) 我们通过将内碎片的整体的浪费控制在10%左右根据内存分配空间最大为256KB将借用的内存大小分为5个范围然后设定5个对齐规则将哈希桶总共添加208个自由链表并将这些自由链表分为5大类。 那么按照这种划分方式该如何得到该要分配的内存空间在哈希桶中具体的链表下标呢
所以根据要分配的内存空间的大小经过下述运算便能得出结果 //自由链表桶映射下标计算static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes (1 align_shift) - 1) align_shift) - 1;}static inline size_t Index(size_t bytes){assert(bytes MAX_BYTES);// 每个区间有多少个链表static int group_array[4] { 16, 56, 56, 56 };if (bytes 128)return _Index(bytes, 3);//传入2的次幂else if (bytes 1024)return _Index(bytes - 128, 4) group_array[0];//下标要加上所在的区间的前边的区间数else if (bytes 8 * 1024)return _Index(bytes - 1024, 7) group_array[1] group_array[0];else if (bytes 64 * 1024)return _Index(bytes - 8 * 1024, 10) group_array[2] group_array[1] group_array[0];else if (bytes 256 * 1024)return _Index(bytes - 64 * 1024, 13) group_array[3] group_array[2] group_array[1] group_array[0];else{assert(false);return -1;}}
通过下述代码计算要分配的内存空间的对齐大小
//对齐大小计算static inline size_t _RoundUp(size_t bytes, size_t align){return ((bytes align - 1) ~(align - 1));}static inline size_t RoundUp(size_t bytes){if (bytes 128)return _RoundUp(bytes, 8);else if (bytes 1024)return _RoundUp(bytes, 16);else if (bytes 8 * 1024)return _RoundUp(bytes, 128);else if (bytes 64 * 1024)return _RoundUp(bytes, 1024);else if (bytes 256 * 1024)return _RoundUp(bytes, 8 * 1024);elsereturn _RoundUp(bytes, 1 PAGE_SHIFT);
通过这两段代码我们就可以实现内存对象的申请和释放规则如下
申请内存
当内存申请size256KB先获取到线程本地存储的thread cache对象计算size映射的哈希桶自由链表下标i。 如果自由链表_freeLists[i]中有对象则直接Pop一个内存对象返回。 如果_freeLists[i]中没有对象时则批量从central cache中获取一定数量的对象插入到自由链表并返回一个对象。
释放内存
当释放内存小于256k时将内存释放回thread cache计算size映射自由链表桶位置i将对象Push到_freeLists[i]。 当链表的长度过长则回收一部分内存对象到central cache。
实际上一开始在 thread cache 中是没有任何空间的它是通过一次次的向central cache索取内存并保留在自由链表中进而一步步壮大自己经过漫长的空间积累之后才可能使得空间分配仅在thread cache这部分进行。
但是如果thread cache中保存的空间过多也应该向central cache归还部分内存。 2.central cache整体设计
central cache也是一个哈希桶结构他的哈希桶的映射关系跟thread cache是一样的这样当thread cache 需要从central cache 中获取对象时直接就可以更容易的从相同映射的自由链表中获取。
不同的是他的每个哈希桶位置挂是SpanList链表结构不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。 申请内存
当thread cache中没有内存时就会批量向central cache申请一些内存对象central cache也有一个哈希映射的spanlistspanlist中挂着span从span中取出对象给thread cache这个过程是需要加锁的不过这里使用的是一个桶锁即每个spanlist拥有单独的锁尽可能提高效率。 central cache映射的spanlist中所有span的都没有内存以后则需要向page cache申请一个新的span对象拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从 span 中取对象给thread cache。 central cache的中挂的span中存在变量use_count记录分配了多少个对象出去分配一个对象给thread cache就use_count。
释放内存
当thread_cache过长或者线程销毁则会将内存释放回central cache中的释放回来时-- use_count。当use_count减到0时则表示所有对象都回到了span则将span释放回page cache page cache中会对前后相邻的空闲页进行合并以此来解决“外碎片”问题。
central cache基础结构
//饿汉单例模式
class CentralCache
{
public:static CentralCache* GetInstance(){return _sInet;}
private:SpanList _spanlists[NFREELISTS];//CentralCache哈希桶
private:CentralCache() {}CentralCache(const CentralCache) delete;static CentralCache _sInet;
};
由于central cache在我们的设计中只存在一个所以我们将其设计为单例模式只能创建出一个实例供多线程调用。 Span基础结构
// 管理多个连续页大块内存跨度结构
struct Span
{PageID _pageId 0;//大块内存起始页的页号size_t _n 0;//页的数量Span* _next nullptr;//双向链表结构Span* _prev nullptr;size_t _objSize 0; //切好的小对象的大小size_t _useCount 0;//切好小块内存被分配给thread cache的计数void* _freelist nullptr;//切好的小块内存的自由链表bool _isUse false;//该span是否被使用
};
SpanList基础结构
class SpanList
{
public://初始化SpanList(){_head new Span;_head-_next _head;_head-_prev _head;}//插入void Insert(Span* pos, Span* newspan){assert(pos);assert(newspan);Span* prev pos-_prev;prev-_next newspan;newspan-_prev prev;newspan-_next pos;pos-_prev newspan;}//删除void Erase(Span* pos){assert(pos);assert(pos ! _head);Span* prev pos-_prev;prev-_next pos-_next;pos-_next-_prev prev;}
private:Span* _head;
public:std::mutex _mtx;//桶锁
}; 3.page cache整体设计
page cache也是哈希桶结构但是其映射关系不再是内存大小而是页。每个页存放的仍然是spanlist链表但是每个span节点不再拥有小对象等上层central cache拿走之后自己进行切分。 从1页开始一直到128页。 最大内存空间为512KB即为了防止当上层要申请256KB的内存时可以多给一个备用。
申请内存
当central cache向page cache申请内存时page cache先检查对应位置有没有span如果没有 则向更大页寻找一个span如果找到则分裂成两个。比如申请的是4页page4页page后面没有挂span则向后面寻找更大的span假设在10页page位置找到一个span则将10页page span分裂为一个4页page span和一个6页page span4页的span交给上层6页的span则继续挂在6页链表中。如果找到_spanList[128]都没有合适的span则向系统使用mmap、brk或者是VirtualAlloc等方式 申请128页page span挂在自由链表中再重复上述过程。
释放内存
如果central cache释放回一个span则依次寻找span的前后page id的没有在使用的空闲span看是否可以合并如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span减少内存碎片。
page cache基础结构
class PageCache
{
public:static PageCache* GetInstance(){return _sInst;}
private:SpanList _spanlist[NPAGES];std::unordered_mapPageID, Span* _idSpanMap;//存放Pageid 和 span映射的mapPageCache(){}PageCache(const PageCache) delete;static PageCache _sInst;
public:std::mutex _mtx;
};
page cache 同样只有一个所以也将其设计为单例模式。 四.申请内存完整逻辑
thread cache 作为第一资源空间线程首先肯定是要向其申请空间代码如下
// 向thread cache申请内存对象
void* ThreadCache::Allocate(size_t size)
{assert(size MAX_BYTES);size_t alignSize SizeClass::RoundUp(size);//获取偏移量size_t index SizeClass::Index(size);//获取链表下标if (!_freelists[index].Empty())//不为空直接从头部获取{return _freelists[index].Pop();}else//为空从下一层central cache索取空间{return FetchFromCentralCache(index, alignSize);}
}
得到所需空间大小的偏移量及其索取的空间所在的桶的下标判断当前桶是否有小对象有则直接返回头部的小对象没有则向下层 centeal cache 索取空间
// 从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//慢开始反馈调节//1.最开始不会向CentralCache一次要太多的小对象因为可能用不完//2.如果你不要这个size的大小内存要求那么batchNum就会不断增长直到上限//3.size越大一次向CentralCache要的batchNum就越小//4.size越小一次向CentralCache要的batchNum就越大size_t batchNum min(_freelists[index].MaxSize(),SizeClass::NumMoveSize(size));if (batchNum _freelists[index].MaxSize()){_freelists[index].MaxSize() 1;}void* start nullptr;void* end nullptr;//从CentralCache中获取 span并返回其数量size_t actualNum CentralCache::GetInstance()-FetchRangeObj(start, end, batchNum, size);assert(actualNum 1);if (actualNum 1)//只获取到一个直接返回{assert(start end);return start;}else//获取到多个返回第一个并将剩余的插入自由链表{_freelists[index].PushRange(NextObj(start), end);return start;}
}
这里存在一个问题到底要向central cache 获取多少小对象呢
引入一个新的函数
// 一次从central cache获取多少个小对象static size_t NumMoveSize(size_t size){// [2, 512]一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 大对象一次批量上限低int num MAX_BYTES / size;if (num 2)num 2;if (num 512)num 512;return num;}
通过该函数根据所需空间的大小来计算要为其分配多少个小对象合适。所需空间越小就为其分配越多的小对象反之则分配的越少而分配的数量限制在 [2,512] 之间。
但是实际上在前几次为其分配空间时也不能真的就为其分配512个小对象这样很可能导致空间用不完所以我们采用慢开始的方式为其分配的空间数量根据该桶中内存的流动情况即其最大内存数量来更合理的为其分配空间。
计算出需要为其分配多少个小对象之后我们就需要开始向 central cache 索取了
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void* start, void* end, size_t batchNum, size_t size)
{//得到小对象所处的自由链表下表size_t index SizeClass::Index(size);_spanlists[index]._mtx.lock();//从自由链表中获取spanSpan* span GetOneSpan(_spanlists[index], size);assert(span);assert(span-_freelist);//从span中获取batchNum个小对象//如果数量够直接返回对应数量如果数量不够则全部返回start span-_freelist;end start;size_t actualNum 1;size_t i 0;while (i batchNum - 1 NextObj(end) ! nullptr){end NextObj(end);i;actualNum;}span-_freelist NextObj(end);NextObj(end) nullptr;span-_useCount actualNum;_spanlists[index]._mtx.unlock();return actualNum;
}
在向central cache索取空间时必须加锁防止多线程争抢资源而引发错乱。
首先仍然要确定你要索取的小对象所处的桶的下标index随后从链表中获取一个span获取span的逻辑我们随后在分析判断当前span中的小对象的数量是否满足数量够则给出对应的数量即可数量不够则全部给出这些对象通过start和end两个指针管理返回给上层函数同时将span的_useCount actualNum记录小对象被使用的数量。最后返回实际给出的小对象的个数。
上层函数 ThreadCache::FetchFromCentralCache 得到小对象之后将第一个小对象给到 thread cache 剩余的对象则插入到对应的自由链表中备用。
下面来看如何从central cache桶中获取一个span
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList list, size_t byte_size)
{//查看当前链表中是否有未分配的spanSpan* it list.Begin();while (it ! list.End()){if (it-_freelist ! nullptr)return it;elseit it-_next;}//到这里说明没有可以使用的span所以多个线程就算都进来也无法竞争//解锁可以让其他释放内存的线程进来list._mtx.unlock();//当前链表中没有未分配的span向page cache获取span需要加锁PageCache::GetInstance()-_mtx.lock();Span* span PageCache::GetInstance()-NewSpan(SizeClass::NumMovePage(byte_size));span-_isUse true;PageCache::GetInstance()-_mtx.unlock();//到这里获取到新的span但是由于还没有插入到自由链表中其他线程无法获取该span所以仍无需加锁//计算span的大块内存的起始地址和大块内存的大小字节数char* start (char*)(span-_pageId PAGE_SHIFT);size_t bytes span-_n PAGE_SHIFT;char* end start bytes;//切分span大块内存成小块用自由链表链接起来//1.先切下来一块做头部方便进行尾插span-_freelist start;start byte_size;void* tail span-_freelist;//2.切块进行尾插while (start end){NextObj(tail) start;tail start;start byte_size;}NextObj(tail) nullptr;//切好span后需要将span挂到桶里此时需要进行加锁list._mtx.lock();list.PushFront(span);return span;
}
通过循环遍历该span链表中是否还有拥有小对象的span有则直接返回没有的话则需要进一步从下层 page cache 获取span。如何向下层获取span我们下文再分析如果已经向下文得到了一个span因为page cache给出的是一整块页内存所以要将这些内存进行切块。
先通过该块内存的页id计算出其起始地址通过startend两个指针将整个大块内存进行切分并链接成自由链表随后将该span插入桶中并返回。
下面来看如何向 page cache获取span
由于page cache的桶的映射关系是页所以你必须告诉我你需要获取一个几页的span
// 计算一次向系统获取几个页// 单个对象 8byte// ...// 单个对象 256KBstatic size_t NumMovePage(size_t size){size_t num NumMoveSize(size);size_t npage num * size;npage PAGE_SHIFT;if (npage 0)npage 1;return npage;}
通过上述计算方法根据所需数据的大小计算出需要多少页 并返回当然就算是最小的数据索取时也必须给出一页内存。
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{assert(k 0 k NPAGES);//先检查第k个桶中是否存在spanif (!_spanlist[k].Empty()){Span* kSpan _spanlist[k].PopFront();//建立pageid 和 span 之间的映射关系方便central cache回收小块内存时查找对应的spanfor (PageID i 0; i kSpan-_n; i){_idSpanMap[kSpan-_pageId i] kSpan;}return kSpan;}//检查后边的桶中有没有span有就进行切分for (int i k 1; i NPAGES; i){if (!_spanlist[i].Empty()){Span* nSpan _spanlist[i].PopFront();Span* kSpan new Span;//在nSpan的头部切下k页给到kSpan// 返回kSpan//nSpan剩余的页数插入对应的自由链表中kSpan-_pageId nSpan-_pageId;kSpan-_n k;//nSpan头部被切去其新页id和页数量要发生变化nSpan-_pageId k;nSpan-_n - k;//剩余的nSpan的首尾也需要建立 pageid 和 span 的映射关系_idSpanMap[nSpan-_pageId] nSpan;_idSpanMap[nSpan-_pageId nSpan-_n - 1] nSpan;_spanlist[nSpan-_n].PushFront(nSpan);//建立pageid 和 span 之间的映射关系方便central cache回收小块内存时查找对应的spanfor (PageID i 0; i kSpan-_n; i){_idSpanMap[kSpan-_pageId i] kSpan;}return kSpan;}}//整个page cache都没有Span需要向堆中获取128page的spanSpan* bigSpan new Span;void* ptr SystemAlloc(NPAGES - 1);bigSpan-_pageId (PageID)ptr PAGE_SHIFT;bigSpan-_n NPAGES - 1;//将获得的128page插入对应的桶中_spanlist[bigSpan-_n].PushFront(bigSpan);//递归调用函数获取k页的spanreturn NewSpan(k);
}
首先判断出当前所需页对应的桶中是否存在span不存在则循环查询其后边的桶查看其中有没有span如果有则将该span进行拆分返回所需的页span将剩余的span重新插入对应的桶中。切分之后我们需要在map中建立页id和span的映射关系方便后续释放内存进行。
如果后续的所有page cache桶都不存在span了则就需要从系统的堆空间中获取一个128页的span将该span直接插入桶中随后采用递归的方式重新获取span并返回。
向堆索取空间如下
//向堆获取内存空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr VirtualAlloc(0, kpage PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE,PAGE_READWRITE);
#else
// linux下brk mmap等
#endifif (ptr nullptr)throw std::bad_alloc();return ptr;
} 根据系统的不同采用不同的函数获取。 以上就是该高并发内存池的整个申请内存的逻辑能够看出程序一开始运行时整个cache是没有任何内存空间的必须通过上层逐步的向下层索取并积累最终才能实现自给自足。 五.释放内存完整逻辑
当一个对象要被释放时要将该对象重新插入到 thread cache 桶中
// 向thread cache释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size MAX_BYTES);//得到要返回的桶的自由链表并插入进去size_t index SizeClass::Index(size);_freelists[index].Push(ptr);//当链表长度大于一次申请的内存时就开始返还一段list给central cacheif (_freelists[index].Size() _freelists[index].MaxSize()){ListTooLong(_freelists[index], size);}
}
我们规定当该桶中对象的个数大于等于该桶的最大对象数量时就将这一段 list 返回给 central cache
// 释放对象时链表过长时回收内存回到central cache
void ThreadCache::ListTooLong(FreeList list, size_t size)
{void* start nullptr;void* end nullptr;list.PopRange(start, end, list.MaxSize());// 将一定数量的对象释放到central cache span跨度CentralCache::GetInstance()-ReleaseListToSpans(start, size);
}
首先要将该段链表从桶中删除而后得到其起始地址start
//删除一段链表void PopRange(void* start, void* end, size_t n){assert(n _size);start _freelist;end start;for (size_t i 0; i n - 1; i){end NextObj(end);}_freelist NextObj(end);NextObj(end) nullptr;_size - n;} 通过起始地址及其每个小对象的大小将该段内存释放为 central cache
// 将一定数量的对象释放到span跨度
void CentralCache::ReleaseListToSpans(void* start, size_t byte_size)
{//确定要归还到的是哪个桶size_t index SizeClass::Index(byte_size);_spanlists[index]._mtx.lock();//循环找到各个小对象对应的span并将其归还while (start){void* next NextObj(start);//根据pageid找到要归还的是桶中的哪一个spanSpan* span PageCache::GetInstance()-MapObjectToSpan(start);//头插进spanNextObj(start) span-_freelist;span-_freelist start;span-_useCount--;//如果该span所有的小对象都回来了则将该span返还给page cacheif (span-_useCount 0){_spanlists[index].Erase(span);span-_freelist nullptr;span-_next nullptr;span-_prev nullptr;//释放该span给page cache 该span不会再被竞争可以释放锁_spanlists[index]._mtx.unlock();//归还span使用Page cache锁PageCache::GetInstance()-_mtx.lock();PageCache::GetInstance()-ReleaseSpanToPageCache(span);PageCache::GetInstance()-_mtx.unlock();//span归还之后需要重新加锁_spanlists[index]._mtx.lock();}start next;}_spanlists[index]._mtx.unlock();
}
首先我们仍然要根据小对象的大小计算出其所要返回的桶下标随后因为每个小对象都是由span切分而来的所以我们希望让每个小对象返回时都能挂在其一开始所在的 span 所以我们需要知道这些小对象到底来自哪个span
// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{PageID id (PageID)obj PAGE_SHIFT;std::unique_lockstd::mutex lock(_mtx);auto ret _idSpanMap.find(id);if (ret ! _idSpanMap.end())return ret-second;else{assert(false);return nullptr;}
} 通过小对象的起始地址能够计算出其所在的页的id通过map找到其对应的span并返回。
在寻找span时也需要对其进行加锁防止发生前一个线程还没找到后一个线程就将其释放的情况发生。
由于每个小对象都可能来自不同的span所以需要一个一个的循环寻找 找到对应的span之后将小对象头插进该span此时当该 span的_useCount 0 时说明此时该 span 中的所有小对象都回来了该 span 很有可能短期内不在被使用所以需要将其整个释放给 page cache
// 释放空闲span回到Pagecache并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{//对span的前页尝试进行合并缓解内存碎片问题while (1){PageID previd span-_pageId - 1;auto ret _idSpanMap.find(previd);//前边的页号没有不合并if (ret _idSpanMap.end())break; //前边相邻页在使用不能合并Span* prevSpan ret-second;if (prevSpan-_isUse)break;//两块页数相加大于128不合并if (span-_n prevSpan-_n NPAGES - 1)break;//排除上述所有情况可以进行合并span-_pageId prevSpan-_pageId;span-_n prevSpan-_n;//将前页从桶中删除_spanlist[prevSpan-_n].Erase(prevSpan);delete prevSpan;}//对span的后页尝试进行合并缓解内存碎片问题while (1){PageID nextid span-_pageId span-_n;auto ret _idSpanMap.find(nextid);//后边的页号没有不合并if (ret _idSpanMap.end())break;//后边相邻页在使用不能合并Span* nextSpan ret-second;if (nextSpan-_isUse)break;//两块页数相加大于128不合并if (span-_n nextSpan-_n NPAGES - 1)break;//排除上述所有情况可以进行合并span-_n nextSpan-_n;//将后页从桶中删除_spanlist[nextSpan-_n].Erase(nextSpan);delete nextSpan;}//将合并好的span插入桶中并将其首尾页在map中映射_spanlist[span-_n].PushFront(span);span-_isUse false;_idSpanMap[span-_pageId] span;_idSpanMap[span-_pageId span-_n - 1] span;}
span 回来之后 由于其本身是一个页切分后的产物所以为了解决内存碎片问题自然是要将该页与其相邻的前后页进行合并。
首先根据 Pageid 与 span的映射关系得到该 span 对应的pageid该 pageid - 1就得到了其前一页的id在通过map映射查看该页id是否存在不存在就不能合并
如果存在则需要根据_isUse查看该页是否正在被使用如果正在被使用不能合并
如果没有被使用则还需计算这两个页的页数相加是否大于该哈希桶中所能存放的最大的页数量超过数量不能合并
不超过则所有条件满足开始进行合并合并也非常简单因为这两个页如果能合并说明它们在物理内存中本身就是一段连续内存所以直接让 span 继承其相邻前页的起始页id在加上其页数量这样span就成为两个页合并后的新页再将指向前页的指针删除即可。
向后合并和向前合并的逻辑基本一致就不在详细分析由于页的合并不仅仅只是进行一次合并之后仍然可以继续向前或向后寻找所以设置为循环合并直到不能合并才退出循环。
合并之后需要将新页插入到其对应的新桶中并记录 pageid 和 span 的映射。
以上就是高并发内存池释放内存的整个逻辑。当内存释放到page cache后并不需要在将内存还给堆因为这些内存需要在 page cache 中积攒而且当进程退出时也会自动释放这些内存。 六.获取更大内存
在一开始我们就规定该高并发内存池主要服务于所需内存小于等于256KB的用户但是如果所需内存超过256KB又该如何申请和释放内存呢 1.申请内存
如果内存超过256KB我们就无法通过thread cache和 central cache来获取内存而应直接向page cache索要一个span因为256KB是32页而我们设计的page cache的最大页数为128。
static void* ConcurrentAlloc(size_t size)
{//所需空间大于内存池规定的最大分配空间if (size MAX_BYTES){size_t alignsize SizeClass::RoundUp(size);size_t kpage alignsize PAGE_SHIFT;PageCache::GetInstance()-_mtx.lock();Span* span PageCache::GetInstance()-NewSpan(kpage);PageCache::GetInstance()-_mtx.unlock();void* ptr (void*)(span-_pageId PAGE_SHIFT);return ptr;}else//所需空间不大于内存池规定的最大分配空间{//...}
}
大于256KB的内存以页作为其偏移量通过计算该内存的偏移量就能够计算出其页数量进而向page cache索取span。
此时我们还需面临一个问题那就是如果该内存大于128页甚至超过了page cache的最大内存限制该怎么办
很简单因为这种内存申请情况一般是很少很少的情况所以直接向堆申请空间即可
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{assert(k 0);//如果所需页数大于128则直接向堆获取if (k NPAGES - 1){void* ptr SystemAlloc(k);Span* span new Span;span-_pageId (PageID)ptr PAGE_SHIFT;span-_n k;_idSpanMap[span-_pageId] span;return span;}//...
}
申请出span之后仍要将其 pageid 和 span 进行映射来方便后续的内存释放。 2.释放内存
释放内存就不多说什么了直接看代码
static void Concurrentfree(void *ptr, size_t size)
{//要释放的空间大于内存池的限定最大空间if (size MAX_BYTES){Span* span PageCache::GetInstance()-MapObjectToSpan(ptr);PageCache::GetInstance()-_mtx.lock();PageCache::GetInstance()-ReleaseSpanToPageCache(span);PageCache::GetInstance()-_mtx.unlock();}else{//...}
}
先通过map映射获取到span随后进行释放当然在释放内存时如果该span的页数量大于128页则直接调用系统函数将其释放回堆空间
// 释放空闲span回到Pagecache并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{//如果要释放的空间大于128页if (span-_n NPAGES - 1){void* ptr (void*)(span-_pageId PAGE_SHIFT);SystemFree(ptr);delete span;return;}//...
}
以上就是当所需内存大于256KB时如何实现其申请与释放。 七.进一步优化
1.脱离使用new
要知道该高并发内存池设计的初衷是实现申请内存的效率比malloc更高效的tcmalloc那么就不能在代码中出现任何有关malloc的痕迹包括new对象因为new的底层也是malloc。
那么该如何取代new呢这里设计出一个定长内存池用于取代new的使用
templateclass T
class ObjectPool
{
public:T* New(){T* obj nullptr;//优先把还回来的内存对象再次重复利用if (_freelist){void* next *((void**)_freelist);obj (T*)_freelist;_freelist next;}else {//剩余内存不够一个对象重新开辟大块空间if (_remainBytes sizeof(T)){_remainBytes 128 * 1024;//_memory (char*)malloc(_remainBytes);_memory (char*)SystemAlloc(_remainBytes 13);if (_memory nullptr){throw std::bad_alloc();}}obj (T*)_memory;//如果调用的空间不够存储链表指针size_t objSize sizeof(T) sizeof(void*) ? sizeof(void*) : sizeof(T);_memory objSize;_remainBytes - objSize;}//定位new显示调用T的构造函数初始化new(obj)T;return obj;}//回收空间将空间头插进链表void Delete(T* obj){//显示调用T的析构函数清理对象obj-~T();//头插*(void**)obj _freelist;_freelist obj;}
private:char* _memory nullptr;//指向大内存块的指针size_t _remainBytes 0;//大内存块剩余的大小void* _freelist nullptr;//链接返还内存块的自由链表头指针
}; 该定长内存池的设计可以说是高并发内存池的一个前身通过直接获取一个大块内存然后分块进行申请释放用链表来管理返还回来的块内存。
使用该定长内存池来替换new的使用就可以进一步提高效率。 2.释放内存无需传内存大小
在前边分享释放内存的代码时都需要传入要释放的内存的大小显得有些繁琐且不舒适获取内存时传入要获取的内存大小这是必然的但是释放内存能否就不传入内存的大小呢
很简单因为从头到尾大大小小的内存都和span有关所以只需在span结构体中加入要分配的内存的大小这一字段当需要得到内存大小时直接从span中获取即可 size_t _objSize 0; //切好的小对象的大小 随后修改部分代码在获取一个新span时记录其要分配的内存大小释放内存时也从span中获取内存大小。
static void Concurrentfree(void *ptr)
{//要释放的空间大于内存池的限定最大空间Span* span PageCache::GetInstance()-MapObjectToSpan(ptr);size_t size span-_objSize;if (size MAX_BYTES){PageCache::GetInstance()-_mtx.lock();PageCache::GetInstance()-ReleaseSpanToPageCache(span);PageCache::GetInstance()-_mtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache-Deallocate(ptr, size);}
} 八.性能分析
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vectorstd::thread vthread(nworks);std::atomicsize_t malloc_costtime 0;std::atomicsize_t free_costtime 0;for (size_t k 0; k nworks; k){vthread[k] std::thread([]() {std::vectorvoid* v;v.reserve(ntimes);for (size_t j 0; j rounds; j){size_t begin1 clock();for (size_t i 0; i ntimes; i){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 i) % 8192 1));}size_t end1 clock();size_t begin2 clock();for (size_t i 0; i ntimes; i){ConcurrentFree(v[i]);}size_t end2 clock();v.clear();malloc_costtime (end1 - begin1);free_costtime (end2 - begin2);}});}for (auto t : vthread){t.join();}printf(%u个线程并发执行%u轮次每轮次concurrent alloc %u次: 花费%u ms\n,nworks, rounds, ntimes, malloc_costtime.load());printf(%u个线程并发执行%u轮次每轮次concurrent dealloc %u次: 花费%u ms\n,nworks, rounds, ntimes, free_costtime.load());printf(%u个线程并发concurrent allocdealloc %u次总计花费%u ms\n,nworks, nworks * rounds * ntimes, malloc_costtime.load() free_costtime.load());
}
int main()
{size_t n 10000;BenchmarkConcurrentMalloc(n, 4, 10);return 0;
}
上述代码是一个针对于高并发线程池申请和释放代码的性能的一个测试代码其中用到了atomic类用于原子性的计算时间防止多个线程并发导致时间计算出错的情况。
将高并发内存池与malloc对比能够发现性能确实提升了不少 1.寻找性能瓶颈
上述代码测试的是4个线程并发执行10轮每轮申请或释放内存10000次的性能情况。
对该测试代码进行性能分析寻找性能瓶颈得到如下结果 能够看出整个代码运行过程中释放内存所占用的时间最多进一步深入查看释放内存的逻辑能够得出如下结果 在执行通过map隐射找到对应的span时对该过程进行加锁解锁消耗了巨大的时间。 2.针对性能瓶颈进一步优化
在tcmalloc源码中大佬们使用了基数树来存放pageid和span的映射关系基数树与unordered_map的结构不同基数树使用了一种类似于指针数组的树形存储结构通过指针连接各个节点每个节点存储一个键值对它的结构是固定的并不会像哈希表结构在插入和删除操作时可能会发生变化如扩容、重新哈希等因此基数树可以实现读写分离在多个线程并发访问时读操作就不用加锁了。 基数树也是一种分层结构每一层划分一定的键值范围。当基数树为高并发内存池服务时通常将其设计为三种结构分别是一层二层三层三种结构其中一层、二层结构适用于32位系统下的高并发内存池三层则适用于64位系统下的高并发内存池。 下面仅解析一层、二层结构的基数树。
基数树代码来源于tcmalloc源码我们将其部分进行修改以服务于高并发内存池。 1一层基数树
// Single-level array
template int BITS
class TCMalloc_PageMap1 {
private:static const int LENGTH 1 BITS;void** array_;
public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) explicit TCMalloc_PageMap1(){//array_ reinterpret_castvoid**((*allocator)(sizeof(void*) BITS));size_t size sizeof(void*) BITS;size_t alignSize SizeClass::_RoundUp(size, 1 PAGE_SHIFT);array_ (void**)SystemAlloc(alignSize PAGE_SHIFT);memset(array_, 0, sizeof(void*) BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k BITS) 0) {return NULL;}return array_[k];}// REQUIRES k is in range [0,2^BITS-1].// REQUIRES k has been ensured before.//// Sets the value v for key k.void set(Number k, void* v) {array_[k] v;}
}; 在32为系统下根据页的大小总共能存储2 ^ 32 / 2 ^ 13 2 ^ 19个页因此BITS要传入19在构造函数中要为指针数组array_开辟空间这里该用了直接从系统堆空间来获取。
随后直接根据页号通过 get 和 set 函数来获取和写入映射。 2二层基数树
// Two-level radix tree
template int BITS
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS 5;static const int ROOT_LENGTH 1 ROOT_BITS;static const int LEAF_BITS BITS - ROOT_BITS;static const int LEAF_LENGTH 1 LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) explicit TCMalloc_PageMap2(){//allocator_ allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 k LEAF_BITS;const Number i2 k (LEAF_LENGTH - 1);if ((k BITS) 0 || root_[i1] NULL) {return NULL;}return root_[i1]-values[i2];}void set(Number k, void* v) {const Number i1 k LEAF_BITS;const Number i2 k (LEAF_LENGTH - 1);ASSERT(i1 ROOT_LENGTH);root_[i1]-values[i2] v;}//开辟空间bool Ensure(Number start, size_t n) {for (Number key start; key start n - 1;) {const Number i1 key LEAF_BITS;// Check for overflowif (i1 ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] NULL) {//Leaf* leaf reinterpret_castLeaf*((*allocator_)(sizeof(Leaf)));//if (leaf NULL) return false;static ObjectPoolLeaf LeafPool;Leaf* leaf (Leaf*)LeafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] leaf;}// Advance key past whatever is covered by this leaf nodekey ((key LEAF_BITS) 1) LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 BITS);}
};
二层基数树不同于一层的点在于它将2 ^ 19个节点分为两层来连接第一层有2 ^ 5个节点每个节点中再连接 2 ^ 14个节点类似于二维数组的树形机构。
总体来说一层基数树和二层基数树在空间占据上是相同的而二层基数树的优势在于可以不用直接开辟 2 ^ 19个节点当你需要用到对于键值范围的节点时再进行开辟。
但是上述代码在设计时依然是直接将所有的空间都开辟好了。
同时在 get 和 set 的调用中需要分别计算出对应键值所在的第一层和第二层的位置也就是页号k 的高5位和低14位。
三层基数树的结构与二层类似这里就不在过多分享。 使用基数树优化之后结果如下 与优化前相比性能提升了一倍不同的计算机系统的优化力度也会不一样。 总结
以上就是高并发内存池项目的完整分享啦第一次写一个大项目出现了很多的问题发现了很多知识层面的掌握不牢或者忘记的情况但是完整写完整个项目之后感觉自己的代码能力有了很大的提升对于相关知识的掌握也有了质的飞跃。
最后完整代码放在Gitee仓库ConcurrentMemoryPool · 楠朋友学编程/MyProjects - 码云 - 开源中国