便宜网站建设,素材下载网站,上海制作网站公司,微信小程序开发需要什么技能引言
最近做了一个需求#xff0c;是定时任务相关的。以前定时任务都是通过 linux crontab 去实现的#xff0c;现在服务上云(k8s)了#xff0c;尝试了 k8s 的 CronJob#xff0c;由于公司提供的是界面化工具#xff0c;使用、查看起来很不方便。于是有了本文#xff0c…引言
最近做了一个需求是定时任务相关的。以前定时任务都是通过 linux crontab 去实现的现在服务上云(k8s)了尝试了 k8s 的 CronJob由于公司提供的是界面化工具使用、查看起来很不方便。于是有了本文通过一个单 pod 去实现一个常驻服务去跑定时任务。 经过筛选选用了 cron 这个库它支持 linux cronjob 语法取配置定时任务还支持every 10s、hourly 等描述符去配置定时任务完全满足我们要求比如下面的例子
package mainimport (fmtgithub.com/natefinch/lumberjackgithub.com/robfig/cron/v3github.com/sirupsen/logrus
)type CronLogger struct {clog *logrus.Logger
}func (l *CronLogger) Info(msg string, keysAndValues ...interface{}) {l.clog.WithFields(logrus.Fields{data: keysAndValues,}).Info(msg)
}func (l *CronLogger) Error(err error, msg string, keysAndValues ...interface{}) {l.clog.WithFields(logrus.Fields{msg: msg,data: keysAndValues,}).Warn(err.Error())
}func main() {logger : logrus.New()_logger : lumberjack.Logger{Filename: ./test.log,MaxSize: 50,MaxAge: 15,MaxBackups: 5,}logger.SetOutput(_logger)logger.SetFormatter(logrus.JSONFormatter{DisableHTMLEscape: true,})c : cron.New(cron.WithLogger(CronLogger{clog: logger,}))c.AddFunc(*/5 * * * *, func() {fmt.Println(你的流量包即将过期了)})c.AddFunc(*/2 * * * *, func() {fmt.Println(你的转码包即将过期了)})c.Start()for {select {}}
}使用了 cronjob、并结合了 golang 的 log 组建输出日志到文件使用很方便。
但是在使用过程中发现还有些不足缺少某些功能比如我很想使用的查看任务列表。
类库介绍
扩展性强
此类库扩展性挺强通过 JobWrapper 去包装一个任务NewChain(w1, w2, w3).Then(job)相关实现如下
type JobWrapper func(Job) Job
type Chain struct {wrappers []JobWrapper
}
func NewChain(c ...JobWrapper) Chain {return Chain{c}
}
func (c Chain) Then(j Job) Job {for i : range c.wrappers {j c.wrappers[len(c.wrappers)-i-1](j)}return j
}比如当前脚本如果还没有执行完下次任务时间又到了就可以通过如下默认提供的 wrapper 去避免继续执行。可以看到最后执行的任务 j.Run() 被包装在了一个函数闭包中并且根据闭包中的 channel 去判断是否执行避免重复执行。首次执行的时候容量为 1 的 channel 中已经有数据了重复执行时channel 无数据默认跳过等上次任务执行完成后又像 channel 中写入一条数据下次 channel 可以读出数据又可以执行任务了
func SkipIfStillRunning(j Job) Job {var ch make(chan struct{}, 1)ch - struct{}{}return FuncJob(func() {select {case v : -ch:defer func() { ch - v }()j.Run()default:// skip}})
}主流程
cron 主流程是启动一个协程里面有双重 for 循环下面我们来一起分析一下。
定时器
第一层循环首先计算下次最早执行任务的时间跟当前时间间隔 gap然后设置定时器为 gap这里很巧妙定时器间隔不是 1s/次而是跟下次任务的时间相关这样就避免了无用的定时器循环也让执行时间更精准不存在设置小了浪费资源设置大了误差大的情况。接下来进入第二层循环。
sort.Sort(byTime(c.entries))
timer time.NewTimer(c.entries[0].Next.Sub(now))事件循环
事件循环中包含了很多事件比如 添加任务、停止、移除任务当 cron 启动之后这些任务都是异步的。比如添加任务不会直接将任务信息写入内存中而是进入事件循环加入之后重新计算第一二层循环避免了正在修改任务信息又执行任务信息然后出错的情况。
有人可能会问了为何不在事件中加锁这样也能避免内存竞争。我想说我们执行的是脚本任务有的事件可能很长可能会阻塞有些事件所以这些事件都放在循环中避免了加锁也满足了要求。
for {select {case now -timer.C:// 执行任务case newEntry : -c.add:// 添加任务case replyChan : -c.snapshot:// 获取任务信息case -c.stop:// 停止任务case id : -c.remove:// 移除任务}break
}类库改造
在了解了项目的基本情况之后对项目做了部分改造方便使用。
打印任务列表信息
在主循环汇总加入了信号量监听当触发信号量 SIGUSR1将任务信息输出到日志
usrSig : make(chan os.Signal, 1)
signal.Notify(usrSig, syscall.SIGUSR1)for {select {case -usrSig:// 启动单独的协程去打印定时任务执行信息continue}break
}根据名称移除脚本
目前脚本只能根据脚本 id 去移除要执行的任务执行过程中也不能通过命令去移除任务不是太方便。比如有个脚本马上要执行了但是该脚本发现问题了这时候生产环境的话就需要更新代码然后重启服务去下线脚本任务这时候黄花菜可能都凉了。
所以我也是通过信号量来处理运行之后运行中移除任务的问题收到信号量之后读取文件中的内容根据命令去处理 runing 中的内存
usrSig2 : make(chan os.Signal, 1)
signal.Notify(usrSig2, syscall.SIGUSR2)......
case -usrSig2:actionByte, err : os.ReadFile(/tmp/cron.action)...... //校验命令正确性action : strings.Fields(string(actionByte))switch action[0] {case removeTag:timer.Stop()now c.now()c.removeEntryByTag(action[1])c.logger.Info(removedByTag, tag, action[1])}
......改造效果
由于原项目已经 2 年多没有个更新过了就算发起 pr 估计也不会被处理所以 fork 一份放在了这里 aizuyan/cron 进行改造下面是改进之后的代码
package mainimport (// 加载配置文件fmtgithub.com/aizuyan/cron/v3
)func main() {c : cron.New(cron.WithLogger(cron.DefaultLogger))c.AddFuncWithTag(流量包过期, */5 * * * *, func() {fmt.Println(你的流量包即将过期了)})c.AddFuncWithTag(转码包过期, */2 * * * *, func() {fmt.Println(你的转码包即将过期了)})c.Start()for {select {}}
}对每个定时任务增加了一个名称标识当任务启动后当我们执行 kill -SIGUSR1 pid 的时候会看到 stdout 输出了运行的任务列表信息
-----------------------------------------------------------------------
| ID | TAG | SPEC | PREV | NEXT |
-----------------------------------------------------------------------
| 2 | 转码包过期 | */2 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:22:00 |
| 1 | 流量包过期 | */5 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:25:00 |
-----------------------------------------------------------------------执行 kill -SIGUSR2 pid移除转码包过期任务避免了使用 ID 容易出错的问题。
cat /tmp/cron.action
removeTag 转码包过期
// {data:[tag,转码包过期],level:info,msg:removedByTag,time:2023-04-02T18:32:5608:00}放目前为止是不是更好用了基本能满足我们的需求了也可以自己去再做各种扩展。