旅游网站建设的方法,环球军事头条,各网站文风,网站ui设计软件引言
本文参考小徐先生的相关博客整理#xff0c;项目地址为#xff1a; https://github.com/xiaoxuxiansheng/timewheel/blob/main/redis_time_wheel.go。主要是完善流程以及记录个人学习笔记。 分布式版实现
本章我们讨论一下#xff0c;如何基于 redis 实现分布式版本的…引言
本文参考小徐先生的相关博客整理项目地址为 https://github.com/xiaoxuxiansheng/timewheel/blob/main/redis_time_wheel.go。主要是完善流程以及记录个人学习笔记。 分布式版实现
本章我们讨论一下如何基于 redis 实现分布式版本的时间轮以贴合实际生产环境对分布式定时任务调度系统的诉求.
redis 版时间轮的实现思路是使用 redis 中的有序集合 sorted set简称 zset 进行定时任务的存储管理其中以每个定时任务执行时间对应的时间戳作为 zset 中的 score完成定时任务的有序排列组合.
zset 数据结构的 redis 官方文档链接https://redis.io/docs/data-type这里简单看一下使用。 Redis 的 ZSET有序集合是 Redis 数据类型之一它是字符串元素的集合且不允许重复的成员。不同的是每个元素都会关联一个 double 类型的分数。Redis 正是通过分数来为集合中的成员进行从小到大的排序。ZSET的成员是唯一的但分数score却可以重复。
基本操作包括添加元素、删除元素、修改元素的分数、查询元素的分数等。以下是一些常用的 ZSET 操作命令
ZADD key score member向 ZSET 中添加一个元素如果元素已存在则更新其分数。ZSCORE key member获取 ZSET 中元素的分数。ZRANGE key start stop [WITHSCORES]按照分数从低到高的顺序返回 ZSET 中指定区间内的元素如果使用了 WITHSCORES 选项则结果中会包含元素的分数。ZREVRANGE key start stop [WITHSCORES]功能与 ZRANGE 相同但是元素是按分数从高到低返回的。ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]返回 ZSET 中分数在 min 和 max 之间的元素。ZREMRANGEBYRANK key start stop删除 ZSET 中排名在给定区间内的所有成员。ZREMRANGEBYSCORE key min max删除 ZSET 中分数在给定区间内的所有成员。ZINCRBY key increment member增加或减少 ZSET 中指定成员的分数。ZCARD key获取 ZSET 的成员数。ZCOUNT key min max计算 ZSET 中分数在 min 和 max 之间的成员数量。ZREM key member [member …]删除 ZSET 中的一个或多个成员。
docker安装
取最新版的 Redis 镜像
docker pull redis:latest运行容器 安装完成后我们可以使用以下命令来运行 redis 容器
$ docker run -itd --name redis-test -p 6379:6379 redis接着我们通过 redis-cli 连接测试使用 redis 服务。
$ docker exec -it redis-test /bin/bash使用示例
# 向名为 myzset 的 ZSET 中添加三个元素
ZADD myzset 1 one 2 two 3 three# 获取 myzset 中的所有元素和它们的分数
ZRANGE myzset 0 -1 WITHSCORES# 获取 myzset 中分数为 2 的成员的数量
ZCOUNT myzset 2 2# 增加元素 one 的分数
ZINCRBY myzset 10 one 代码使用
代码可以参考仓库https://github.com/xiaoxuxiansheng/timewheel/blob/main/redis_time_wheel.go。这里输出启动过程。 数据结构
redis 时间轮
在 redis 版时间轮中有两个核心类第一个是关于时间轮的类定义
redisClient定时任务的存储是基于 redis zset 实现的因此需要内置一个 redis 客户端这部分在 3.2 小节展开httpClient定时任务执行时是通过请求使用方预留回调地址的方式实现的因此需要内置一个 http 客户端channel × 2ticker 和 stopc 对应为 golang 标准库定时器以及停止 goroutine 的控制器
// 基于 redis 实现的分布式版时间轮
type RTimeWheel struct {// 内置的单例工具用于保证 stopc 只被关闭一次sync.Once// redis 客户端redisClient *redis.Client// http 客户端. 在执行定时任务时需要使用到.httpClient *thttp.Client// 用于停止时间轮的控制器 channelstopc chan struct{// 触发定时扫描任务的定时器 ticker *time.Ticker
}定时任务
定时任务的类型定义如下其中包括定时任务的唯一键 key以及执行定时任务回调时需要使用到的 http 协议参数.
// 使用方提交的每一笔定时任务
type RTaskElement struct {// 定时任务全局唯一 keyKey string json:key// 定时任务执行时回调的 http urlCallbackURL string json:callback_url// 回调时使用的 http 方法Method string json:method// 回调时传递的请求参数Req interface{} json:req// 回调时使用的 http 请求头Header map[string]string json:header
}构造器
在构造时间轮实例时使用方需要注入 redis 客户端以及 http 客户端.
在初始化流程中ticker 为 golang 标准库实现的定时器定时器的执行时间间隔固定为 1 s. 此外会异步运行 run 方法启动一个常驻 goroutine生命周期会通过 stopc channel 进行控制.
func NewRTimeWheel(redisClient *redis.Client, httpClient *thttp.Client) *RTimeWheel {r : RTimeWheel{ticker: time.NewTicker(time.Second),redisClient: redisClient,httpClient: httpClient,stopc: make(chan struct{}),}go r.run()return r
}启动与停止
时间轮常驻 goroutine 运行流程同样通过 for select 的形式运行
接收到 stopc 信号时goroutine 退出时间轮停止运行接收到 ticker 信号时开启一个异步 goroutine 用于执行当前批次的定时任务
// 运行时间轮
func (r *RTimeWheel) run() {// 通过 for select 的代码结构运行一个常驻 goroutine 是常规操作for {select {// 接收到终止信号则退出 goroutinecase -r.stopc:return// 每次接收到来自定时器的信号则批量扫描并执行定时任务case -r.ticker.C:// 每次 tick 获取任务go r.executeTasks()}}
}停止时间轮的 Stop 方法通过关闭 stopc 保证常驻 goroutine 能够及时退出.
// 停止时间轮
func (r *RTimeWheel) Stop() {// 基于单例工具保证 stopc 只能被关闭一次r.Do(func() {// 关闭 stopc使得常驻 goroutine 停止运行close(r.stopc)// 终止定时器 tickerr.ticker.Stop()})
}创建任务
在创建定时任务时每笔定时任务需要根据其执行的时间找到从属的分钟时间片.
定时任务真正的存储逻辑定义在一段 lua 脚本中通过 redis 客户端的 Eval 方法执行.
// 添加定时任务
func (r *RTimeWheel) AddTask(ctx context.Context, key string, task *RTaskElement, executeAt time.Time) error {// 前置对定时任务的参数进行校验if err : r.addTaskPrecheck(task); err ! nil {return err}task.Key key// 将定时任务序列化成字节数组taskBody, _ : json.Marshal(task)// 通过执行 lua 脚本实现将定时任务添加 redis zset 中. 本质上底层使用的是 zadd 指令._, err : r.redisClient.Eval(ctx, LuaAddTasks, 2, []interface{}{// 分钟级 zset 时间片r.getMinuteSlice(executeAt),// 标识任务删除的集合r.getDeleteSetKey(executeAt),// 以执行时刻的秒级时间戳作为 zset 中的 scoreexecuteAt.Unix(),// 任务明细string(taskBody),// 任务 key用于存放在删除集合中key,})return err
}//使用示例
if err : rTimeWheel.AddTask(ctx, test1, RTaskElement{CallbackURL: callbackURL,Method: callbackMethod,Req: callbackReq,Header: callbackHeader,
}, time.Now().Add(time.Second)); err ! nil {t.Error(err)return
}// 1 添加任务时如果存在删除 key 的标识则将其删除
// 添加任务时根据时间所属的 min决定数据从属于哪个分片{}
LuaAddTasks local zsetKey KEYS[1]local deleteSetKey KEYS[2]local score ARGV[1]local task ARGV[2]local taskKey ARGV[3]redis.call(srem,deleteSetKey,taskKey)return redis.call(zadd,zsetKey,score,task)下面展示的是获取分钟级定时任务有序表 minuteSlice 以及已删除任务集合 deleteSet 的细节. 我们首先看一下addTaskPrecheck这个函数是对task参数对校验。
func (r *RTimeWheel) addTaskPrecheck(task *RTaskElement) error {if task.Method ! http.MethodGet task.Method ! http.MethodPost {return fmt.Errorf(invalid method: %s, task.Method)}if !strings.HasPrefix(task.CallbackURL, http://) !strings.HasPrefix(task.CallbackURL, https://) {return fmt.Errorf(invalid url: %s, task.CallbackURL)}return nil
}现在看一下getMinuteSlice获取定时任务有序表 key 的方法
func (r *RTimeWheel) getMinuteSlice(executeAt time.Time) string {return fmt.Sprintf(xiaoxu_timewheel_task_{%s}, util.GetTimeMinuteStr(executeAt))
}func GetTimeMinuteStr(t time.Time) string {return t.Format(YYYY_MM_DD_HH_MM)
}例如生成的key是xiaoxu_timewheel_task_{2023-11-05-15:08}
获取删除任务集合 key 的方法
func (r *RTimeWheel) getDeleteSetKey(executeAt time.Time) string {return fmt.Sprintf(xiaoxu_timewheel_delset_{%s}, util.GetTimeMinuteStr(executeAt))
}现在我们看一下Lua脚本
type Client struct {opts *ClientOptionspool *redis.Pool
}
// Eval 支持使用 lua 脚本.
func (c *Client) Eval(ctx context.Context, src string, keyCount int, keysAndArgs []interface{}) (interface{}, error) {args : make([]interface{}, 2len(keysAndArgs))args[0] srcargs[1] keyCountcopy(args[2:], keysAndArgs)conn, err : c.pool.GetContext(ctx)if err ! nil {return -1, err}defer conn.Close()return conn.Do(EVAL, args...)
}// 1 添加任务时如果存在删除 key 的标识则将其删除
// 添加任务时根据时间所属的 min决定数据从属于哪个分片{}
LuaAddTasks local zsetKey KEYS[1]local deleteSetKey KEYS[2]local score ARGV[1]local task ARGV[2]local taskKey ARGV[3]redis.call(srem,deleteSetKey,taskKey)return redis.call(zadd,zsetKey,score,task)这段Go代码定义了一个名为 Eval 的方法这个方法使得 Go 客户端能够通过 Redis 连接执行 Lua 脚本。Eval 方法是如何工作的以及它与 Lua 脚本 LuaAddTasks 是如何配合使用的我们可以逐步解析如下
Eval 方法
这个方法属于 Client 类型接受一个上下文context.ContextLua 脚本的源代码src 字符串键的数量keyCount 整数以及一个包含键和参数的切片keysAndArgs []interface{}。方法的开始首先初始化一个足够大的切片 args 来存储 Lua 脚本的源码、键的数量以及所有的键和参数。然后尝试从连接池 c.pool 中获取一个连接并处理可能出现的错误。如果无法获取连接则返回错误。在连接使用完毕后通过 defer 声明确保连接最终会关闭。使用获得的连接执行 Redis 的 EVAL 命令传入前面构造的 args 切片。
Redis 的 EVAL 命令有什么用?
Redis 的 EVAL 命令用于执行 Lua 脚本。Lua 脚本在 Redis 中的执行是原子性的意味着脚本运行期间Redis 服务器不会执行任何其他命令直到该脚本完成。这为用户提供了在一个执行步骤中执行多个命令的能力这些命令要么全部执行要么全部不执行这类似于数据库的事务。 EVAL 命令的基本用法是
EVAL script numkeys key [key ...] arg [arg ...]script 是要执行的 Lua 脚本代码。numkeys 是键的数量这个参数告诉 Redis 哪些是键参数哪些是普通参数以便它可以正确地处理数据分片和脚本缓存。key [key …] 是传递给脚本的键名这些键名由 numkeys 参数指定。arg [arg …] 是传递给脚本的其他参数这些参数不会被 Redis 当作键来处理。
下面是每部分的详细说明 “local current redis.call(‘get’, KEYS[1]) if current then current redis.call(‘incr’, KEYS[1]) else current redis.call(‘set’, KEYS[1], 1) end return current” 是 Lua 脚本。 首先我们使用 redis.call(‘get’, KEYS[1]) 获取 counter 的当前值。 如果 counter 存在即 current 不为 nil我们执行自增操作 redis.call(‘incr’, KEYS[1])。如果 counter 不存在即 current 为 nil我们使用 redis.call(‘set’, KEYS[1], 1) 设置 counter 的值为 1。最后脚本返回 counter 的新值。 1 是 numkeys 参数指示给 Lua 脚本的键参数数量。counter 是键名这是我们要自增的键。
LuaAddTasks 脚本
这个 Lua 脚本预计接收两个键和三个参数。KEYS[1] 是一个有序集合的键zsetKey用来存储需要添加的任务。KEYS[2] 是一个集合的键deleteSetKey其中包含需要删除的任务键名。ARGV[1] 是一个分数score在有序集合中用来排序任务。ARGV[2] 是任务内容task这是要添加到有序集合的值。ARGV[3] 是任务的键名taskKey在删除集合中用来指定要删除的任务。Lua 脚本首先调用 redis.call(‘srem’, deleteSetKey, taskKey) 来从 deleteSetKey 集合中移除指定的 taskKey。 然后脚本通过 redis.call(‘zadd’, zsetKey, score, task) 将任务 task 与它的分数 score 添加到 zsetKey 的有序集合中并返回该操作的结果。当客户端想要添加一个新任务时它可以使用 Eval 方法执行 LuaAddTasks 脚本。如果添加任务时存在要删除的键那么 Lua 脚本首先会处理这个删除操作接着再添加新任务到对应的有序集合中。这种方式是原子性的也就是说删除和添加操作要么都发生要么都不发生这是利用 Lua 脚本操作 Redis 的一大优势。
下面展示一下创建定时任务流程中 lua 脚本的执行逻辑 删除任务
删除定时任务的方式是将定时任务追加到分钟级的已删除任务 set 中. 之后在检索定时任务时会根据这个 set 对定时任务进行过滤实现惰性删除机制.
// 从 redis 时间轮中删除一个定时任务
func (r *RTimeWheel) RemoveTask(ctx context.Context, key string, executeAt time.Time) error {// 执行 lua 脚本将被删除的任务追加到 set 中._, err : r.redisClient.Eval(ctx, LuaDeleteTask, 1, []interface{}{r.getDeleteSetKey(executeAt),key,})return err
}const( // 删除定时任务 lua 脚本LuaDeleteTask -- 获取标识删除任务的 set 集合的 keylocal deleteSetKey KEYS[1]-- 获取定时任务的唯一键local taskKey ARGV[1]-- 将定时任务唯一键添加到 set 中redis.call(sadd,deleteSetKey,taskKey)-- 倘若是 set 中的首个元素则对 set 设置 120 s 的过期时间local scnt redis.call(scard,deleteSetKey)if (tonumber(scnt) 1)thenredis.call(expire,deleteSetKey,120)endreturn scnt
) 执行定时任务
在执行定时任务时会通过 getExecutableTasks 方法批量获取到满足执行条件的定时任务 list然后并发调用 execute 方法完成定时任务的回调执行.
// 批量执行定时任务
func (r *RTimeWheel) executeTasks() {defer func() {if err : recover(); err ! nil {// log}}()// 并发控制保证 30 s 之内完成该批次全量任务的执行及时回收 goroutine避免发生 goroutine 泄漏tctx, cancel : context.WithTimeout(context.Background(), time.Second*30)defer cancel()// 根据当前时间条件扫描 redis zset获取所有满足执行条件的定时任务tasks, err : r.getExecutableTasks(tctx)if err ! nil {// logreturn}// 并发执行任务通过 waitGroup 进行聚合收口var wg sync.WaitGroupfor _, task : range tasks {wg.Add(1)// shadowtask : taskgo func() {defer func() {if err : recover(); err ! nil {}wg.Done()}()// 执行定时任务if err : r.executeTask(tctx, task); err ! nil {// log}}()}wg.Wait()
}这个 Lua 脚本是为了在 Redis 中处理和管理集合Set类型的数据并且带有某种形式的过期时间管理。具体步骤如下 local deleteSetKey KEYS[1]: 将 Lua 脚本中的第一个键参数赋值给变量 deleteSetKey。这里 KEYS 是一个从 EVAL 命令传入的参数数组代表了键的名称。在 Redis 的 Lua 脚本中KEYS 数组用于传递键名参数。 local taskKey ARGV[1]: 将脚本的第一个非键参数赋值给变量 taskKey。在 EVAL 命令中ARGV 数组用于传递除了键之外的其他参数。 redis.call(‘sadd’, deleteSetKey, taskKey): 使用 sadd 命令将 taskKey 添加到名为 deleteSetKey 的集合中。如果 taskKey 已经是集合的成员则该命令不做任何操作。如果成功添加了新元素它会返回 1。 local scnt redis.call(‘scard’, deleteSetKey): 获取名为 deleteSetKey 的集合的成员数量并将这个数量赋值给变量 scnt。 if (tonumber(scnt) 1) then: 判断 deleteSetKey 集合中元素的数量是否为 1。Lua 脚本中tonumber 函数用于确保 scnt 的值被当作数字处理。 redis.call(‘expire’, deleteSetKey, 120): 如果 deleteSetKey 的集合只有一个成员即刚添加的 taskKey则设置该集合的过期时间为 120 秒。expire 命令用于设置键的生存时间TTL。 return scnt: 脚本返回 deleteSetKey 集合的成员数量。
检索定时任务
最后介绍一下如何根据当前时间获取到满足执行条件的定时任务列表
每次检索时首先根据当前时刻推算出所从属的分钟级时间片然后获得当前的秒级时间戳作为 zrange 指令检索的 score 范围调用 lua 脚本同时获取到已删除任务 set 以及 score 范围内的定时任务 list.通过 set过滤掉被删除的任务然后返回满足执行条件的定时任务
func (r *RTimeWheel) getExecutableTasks(ctx context.Context) ([]*RTaskElement, error) {now : time.Now()// 根据当前时间推算出其从属的分钟级时间片minuteSlice : r.getMinuteSlice(now)// 推算出其对应的分钟级已删除任务集合deleteSetKey : r.getDeleteSetKey(now)nowSecond : util.GetTimeSecond(now)// 以秒级时间戳作为 score 进行 zset 检索score1 : nowSecond.Unix()score2 : nowSecond.Add(time.Second).Unix()// 执行 lua 脚本本质上是通过 zrange 指令结合秒级时间戳对应的 score 进行定时任务检索rawReply, err : r.redisClient.Eval(ctx, LuaZrangeTasks, 2, []interface{}{minuteSlice, deleteSetKey, score1, score2,})if err ! nil {return nil, err}// 结果中首个元素对应为已删除任务的 key 集合后续元素对应为各笔定时任务replies : gocast.ToInterfaceSlice(rawReply)if len(replies) 0 {return nil, fmt.Errorf(invalid replies: %v, replies)}deleteds : gocast.ToStringSlice(replies[0])//获取删除元素集合deletedSet : make(map[string]struct{}, len(deleteds))for _, deleted : range deleteds {deletedSet[deleted] struct{}{}}// 遍历各笔定时任务倘若其存在于删除集合中则跳过否则追加到 list 中返回用于后续执行tasks : make([]*RTaskElement, 0, len(replies)-1)for i : 1; i len(replies); i {var task RTaskElementif err : json.Unmarshal([]byte(gocast.ToString(replies[i])), task); err ! nil {// logcontinue}if _, ok : deletedSet[task.Key]; ok {continue}tasks append(tasks, task)}return tasks, nil
}lua 脚本的执行逻辑如下
( // 扫描 redis 时间轮. 获取分钟范围内,已删除任务集合 以及在时间上达到执行条件的定时任务进行返回LuaZrangeTasks -- 第一个 key 为存储定时任务的 zset keylocal zsetKey KEYS[1]-- 第二个 key 为已删除任务 set 的 keylocal deleteSetKey KEYS[2]-- 第一个 arg 为 zrange 检索的 score 左边界local score1 ARGV[1]-- 第二个 arg 为 zrange 检索的 score 右边界local score2 ARGV[2]-- 获取到已删除任务的集合local deleteSet redis.call(smembers,deleteSetKey)-- 根据秒级时间戳对 zset 进行 zrange 检索获取到满足时间条件的定时任务local targets redis.call(zrange,zsetKey,score1,score2,byscore)-- 检索到的定时任务直接从时间轮中移除保证分布式场景下定时任务不被重复获取redis.call(zremrangebyscore,zsetKey,score1,score2)-- 返回的结果是一个 tablelocal reply {}-- table 的首个元素为已删除任务集合reply[1] deleteSet-- 依次将检索到的定时任务追加到 table 中for i, v in ipairs(targets) doreply[#reply1]vendreturn reply
)总结
本期和大家探讨了如何基于 golang 从零到一实现时间轮算法通过原理结合源码详细展示了单机版和 redis 分布式版时间轮的实现方式. 参考
https://zhuanlan.zhihu.com/p/658079556