宁波网站设计推荐荣盛网络,国外做机械设计任务的网站,网站 前台 设计要求,wordpress商城主题破解版文章目录 LFU 算法理论介绍算法实现数据结构查询操作插入/更新操作 Redis 缓存淘汰算法缓存污染难题Redis LFU缓存淘汰策略 本篇博客的主要内容#xff1a;
以图解的方式#xff0c;介绍 LFU 算法的一种实现#xff1b;介绍 LFU 算法在 Redis 中的应用。
LFU 算法
理论介… 文章目录 LFU 算法理论介绍算法实现数据结构查询操作插入/更新操作 Redis 缓存淘汰算法缓存污染难题Redis LFU缓存淘汰策略 本篇博客的主要内容
以图解的方式介绍 LFU 算法的一种实现介绍 LFU 算法在 Redis 中的应用。
LFU 算法
理论介绍
最不常使用 (LFU Least Frequently Used) 算法是一种常用的缓存置换算法旨在移除 访问频次最低 的缓存项。与常用的 LRU 算法不同LFU 算法更加重视元素的访问频率而非最近一次访问时间。
实现 LFU 淘汰算法的缓存需要满足如下规则
缓存中键值对的数目不能超过容量 capacity。get(key) 操作如果键 key 存在于缓存则返回该键否则空值。put(key, val) 操作如果 key 已经存在变更其值如果 key 不存在插入键值对。当缓存达到其容量 capacity 时则应该在插入新项之前移除 最不经常使用 的键值对。如果存在两个或更多键具有相同的最小频率应该去除 最近最久未使用(LRU) 的键值对。
解释下高亮部分
最不经常使用的键值对使用频数最少的键使用 get(key) 或 put(key, val) 访问键对都会增加键的频数。当一个键首次插入到缓存中时它的使用计数器被设置为 1。最近最久未使用的键值对缓存中键的最小频数为 count_min当需要淘汰缓存时可能存在多个键的频数等于 count_min。对于这种情况针对频数等于 count_min 的键值对采用 LRU最近最久未使用算法淘汰。 算法实现
本节介绍如何设计 get、put 函数以 O(1) 时间复杂度运行 的 LFU 缓存。
数据结构
缓存中的键值对通过一个全局双向循环链表存储链表节点 Node 的定义如下
struct Node{int key;int val;Node* prev;Node* next;int count;Node(int k, int v): key(k), val(v), prev(nullptr), next(nullptr), count(1) {}Node(int k, int v, Node *p, Node *n): key(k), val(v), prev(p), next(n), count(1) {}
};key、val键值对的键与值prev前驱节点next后继节点count节点的访问次数新节点插入时设置初始值 1。
缓存中使用字段 head 存储双向循环链表的头节点头节点是 链表中频数最高的节点
Node *head nullptr;LFU 缓存是一个由多个 链表分段 拼装而成的双向循环链表相同链表段中的节点频数相等。示意图如下 当一个频数 count 的节点被访问频数将递增为 count 1我们需要在 O(1) 的时间将该节点从频数 count 的链表分段移动到频数 count 1 的链表分段头部。
因此需要一个哈希表 count2SegHead 维护频数count 到链 表分段头节点 的映射
unordered_mapint, Node* count2SegHead;示意图如下 为了在 O(1) 时间复杂度通过 key 访问节点需要使用哈希表 key2Node 维护 键key 到 节点 的映射
unordered_mapint, Node* key2Node;示意图如下 查询操作
查询操作步骤如下
通过哈希表 key2node 查询到指定键的节点 node将 node 节点从双向循环链表中移除节点频数为 count找到 count 1 的链表分段头节点 seg_head更新 node.count count 1将 node 节点插入 seg_head 前同时设置 count2SegHead[count 1] node即设置 node 节点为频数 count 1 的链表段头节点。 下面这副示意图展示了查询 key8 节点的流程
通过 key2Node 找到节点位置key8 节点的频数为 3。从频数 3 的链表分段中移除 key8 节点查询 count2SegHead 得到频数为 314 的链表分段头节点key3将 key8 节点的频数更新为 4然后插入到 key3 节点的前驱。最后更新 head 指针以及 count2SegHead[4] node 查询函数 getNode 代码实现如下
#include bits/stdc.husing namespace std;class LFUCache {struct Node{int key;int val;Node* prev;Node* next;int count;Node(int k, int v): key(k), val(v), prev(nullptr), next(nullptr), count(1) {}Node(int k, int v, Node *p, Node *n): key(k), val(v), prev(p), next(n), count(1) {}};unordered_mapint, Node* key2Node;unordered_mapint, Node* count2SegHead; // 频数到链表分段的映射int capacity;int size 0;Node *head nullptr; // 双向循环链表头节点// 将链表中的 node 节点移动到 newCount 链表段void moveNode(Node* node, int newCount) { // ------(1)auto nextNode remove(node);node-count newCount;if(count2SegHead.find(newCount) count2SegHead.end())insert(node, nextNode);elseinsert(node, count2SegHead[newCount]);}// 将节点插入到链表段头节点 segHead 前, 同时维护 head 指针和 count2LinkedListvoid insert(Node* node, Node* segHead) {count2SegHead[node-count] node;if(segHead nullptr) {node-next node-prev node;head node;return;}node-next segHead, node-prev segHead-prev;node-prev-next node, node-next-prev node;if(head-count node-count) head node;}// remove 函数Node* remove(Node *node){// 循环链表中仅有node节点if(node-next node){count2SegHead.erase(node-count);head nullptr;return nullptr;}Node* next;// node 节点为 node-count 链表段的头节点if(count2SegHead[node-count] node) {if(node-next-count node-count)count2SegHead[node-count] node-next;elsecount2SegHead.erase(node-count);next node-next;} // node 不是链表段头节点else next count2SegHead[node-count];node-prev-next node-next, node-next-prev node-prev;node-prev node-next nullptr;if(head node) head next;return next;}public:LFUCache(int cap) {this-capacity cap;}Node* getNode(int key){if(key2Node.find(key) key2Node.end()) return nullptr;auto node key2Node[key];moveNode(node, node-count 1); // ------(1)return node;}
};moveHead(Node* node, int newCount)将循环链表中的节点 node 移动到频数为 newCount 的链表段头部。 moveHead 执行 remove(Node *node) 函数从双向循环链表中移除 node 节点
如果循环链表中只有 node 节点将 head 设置为 nullptrcount2SegHead 哈希表移除频数为 node-count 的键值对如果 node 为 node-count 链表段的头节点考察 node-next 的频数 如果相等将 node-next 设置为链表段新的头节点如果不相等说明 node 为该链表段的唯一节点移除 node 后链表段也被移除count2SegHead 移除 node-count 键值对。 如果 node 不为链表段头节点可以直接移除无需维护 head 及 count2SegHead返回值node-count 链表段移除 node 后的头节点如果链表段已移除则返回该链表段的后继链表段头节点。
Node* remove(Node *node){// 循环链表中仅有node节点if(node-next node){count2SegHead.erase(node-count);head nullptr;return nullptr;}Node* next;// node 节点为 node-count 链表段的头节点if(count2SegHead[node-count] node) {if(node-next-count node-count)count2SegHead[node-count] node-next;elsecount2SegHead.erase(node-count);next node-next;} // node 不是链表段头节点else next count2SegHead[node-count];node-prev-next node-next, node-next-prev node-prev;node-prev node-next nullptr;if(head node) head next;return next;
}移除 node 节点后使用 insert(Node* node, Node* segHead) 将 node 插入到链表段 segHead 的头部
将 node 设置为 node-count 链表段新头节点count2SegHead[node-count] node;如果 segHead 为 nullptr说明全局链表为空执行双向循环链表的初始化操作如果 segHead 非空将 node 插入到 segHead 的前驱。如果 node.count 操作计数大于等于当前头节点 head 的计数将 node 设置为新的头节点。 // 将节点插入到链表段头节点 segHead 前, 同时维护 head 指针和 count2LinkedListvoid insert(Node* node, Node* segHead) {count2SegHead[node-count] node; // 设置为新的链表段头节点if(segHead nullptr) {// segHead 为 null, 说明全局链表为空, 执行初始化操作node-next node-prev node;head node;return;}// 双向链表插入操作node-next segHead, node-prev segHead-prev;node-prev-next node, node-next-prev node;// 如果 node 操作计数大于等于当前头节点的计数, 更新 headif(head-count node-count) head node;}插入/更新操作
插入/更新操作的步骤如下
使用 getNode 查询键为 key 的节点如果节点存在只需要更新 node.val 为 value 新值频数更新、双向循环链表的维护由 getNode 函数完成。如果节点不存在则执行新节点插入操作新节点的初始频数 count 等于 1 缓存大小等于容量则淘汰缓存中频数最少的节点如果频数最少的节点非唯一淘汰最近一次访问时间最早LRU的节点。缓存大小小于容量将节点插入到频数 1 的链表段头节点之前。 还需要考虑到当前不存在频数为 1 的节点也即不存在频数为 1 的链表段。这种情况下将新节点插入到 head 节点之前即可。
void put(int key, int value) {if(capacity 0) return;Node *node getNode(key);if(node nullptr) {// 键等于 key 的节点不存在, 插入新的节点if(size capacity){// 容量已经达到上限, 淘汰频数最低的节点, 即 head.prevauto evict head-prev;remove(evict);key2Node.erase(evict-key);delete evict;} else size;// 新的节点, node new Node(key, value, nullptr, nullptr);key2Node[key] node;if(count2SegHead.find(1) count2SegHead.end())// 如果不存在频数为 1 的链表段, 将新节点插入到head的前驱;insert(node, head);else // 如果存在频数等于 1 的链表段, 新节点插入到该链表段的头节点前insert(node, count2SegHead[1]);} else node-val value; // 更新键等于 key 的节点值
}下面我用两个场景带大家理解 LFU 缓存的插入操作
场景1插入节点后缓存大小(size7) 未超出 容量capacity(7)新节点(key4) 插入到频数等于 1 的链表段前成为该链表段的头节点。示意图如下 场景2插入节点后缓存大小(size7) 超出 容量capacity(6)先 淘汰操作频数最小的节点 (key7)此时频数为 1 的链表段将被移除新节点(key4) 插入到头节点(key3) 之前。示意图如下 Redis 缓存淘汰算法
Redis 中实现了基于 LFU 的缓存淘汰策略volatile-lfu 和 allkeys-lfu。在介绍该策略之前我们先了解下 缓存污染 的概念。
缓存污染难题
缓存污染 是指 访问很少的数据 在服务完访问请求后还继续 留存在缓存中造成缓存空间的浪费。
缓存污染一旦变得严重后有大量不再访问的数据滞留在缓存中往缓存中写入新数据时需要先把数据逐步淘汰出缓存引入额外的操作时间开销。 如何解决缓存污染问题
volatile-ttl 策略如果业务应用在设置过期时间时明确知道数据的访问情况即数据的时效性Redis 按照数据的剩余最短存活时间淘汰数据可以避免缓存污染。volatile-lru、all-lru淘汰候选数据集中lru 字段最小最近一次访问时间最久的数据。
LRU 方案的缺陷只考虑了数据的访问时间没有考虑数据的访问频数在处理 扫描式单次查询操作 时无法解决缓存问题 幸运的是Redis 从 4.0 版本开始增加了 LFU 淘汰策略从 数据访问的时效性 、数据访问次数 两个角度筛选出需要淘汰的数据。 Redis LFU缓存淘汰策略
在实现 LRU 算法时Redis 使用 RedisObject 结构来保存数据的该结构的 lru 字段记录数据最近一次访问的时间戳。实现 LFU 算法时将原来 24bit 的 lru 字段进一步拆分成两个部分
ldt 值lru 字段的前 16bit表示数据的访问时间戳counter 值lru 字段的后 8bit表示数据的访问次数。
Redis使用LFU策略淘汰数据时选择 lru 字段最小的数据进行淘汰。等价于 优先淘汰访问次数少的数据访问次数相等则淘汰时间戳最小 的数据。 注意count 为计数器的值初始默认为 5 而不是 1。如果初始值为 1刚被写入缓存的数据可能会因为使用次数太少而立即被淘汰。 counter 值 8bits最大值 255如果采用线性增加的方式 counter 很快就会达到上限Redis 就不能很好筛选访问 1000次 和 10000 次的数据。因此Redis 采用了非线性的频数更新策略 p 1 c o u n t × l f u _ l o g _ f a c t o r 1 p\frac{1}{count \times {lfu\_log\_factor} 1} pcount×lfu_log_factor11
每次缓存的数据被访问 counter 加 1 的概率等于 p p p源码如下
double r (double)rand()/RAND_MAX;
...
double p 1.0/(baseval*server.lfu_log_factor 1);
if (r p) counter; 通过设置 lfu_log_factor 配置项可以控制计数器的增加速度避免 counter 值过快到达255。下图所示当 lfu_log_factor 等于100时小于10M 的数据能被区分出来。 一般可以将 lfu_log_factor 取值为 10已经足够区分1k、1w、10w的数据访问量。 counter的衰减机制
LFU 策略使用衰减因子配置项 lfu_decay_time 来控制访问次数的衰减。
LFU 策略会计算当前时间和数据最近一次访问时间的差值并把这个差值换算成以分钟为单位。然后LFU 策略再把这个差值除以 lfu_decay_time 值所得的结果就是数据 counter 要衰减的值。 假设 lfu_decay_time 取值为 1如果数据在 N 分钟内没有被访问那么它的访问次数就要减 N。
lfu_decay_time 取值更大那么相应的衰减值会变小衰减效果也会减弱。如果业务应用中有短时高频访问的数据的话建议把 lfu_decay_time 值设置为 1在数据不被访问后可以 迅速衰减访问次数从缓存中淘汰出去避免缓存污染。 总结LFU 淘汰策略对于扫描式单次数据读取操作时虽然仍然会频繁写缓存但可以 避免访问次数高的热点数据因为最近一次访问时间较早而被淘汰提升了缓存的命中率。