深圳福田区网站建设,网站定制与模板开发,北京网站设计策划公司,正确的域名格式用go实现一个任务调度类 #xff08;泛型#xff09;
源码地址#xff1a; https://github.com/robinfoxnan/BirdTalkServer/blob/main/server/core/workmanager.go
1.概述
实现了一个简单的任务管理系统#xff0c;允许用户定义任务和工作者#xff0c;并将任务分配给…用go实现一个任务调度类 泛型
源码地址 https://github.com/robinfoxnan/BirdTalkServer/blob/main/server/core/workmanager.go
1.概述
实现了一个简单的任务管理系统允许用户定义任务和工作者并将任务分配给工作者进行处理。这个系统旨在提供一个灵活的任务管理框架可以根据需要动态地添加和移除工作者以及处理任务。
2.主要功能
定义了 Task 接口和 Worker 接口用于表示任务和工作者提供了基础的任务类型 BaseTask 和基础的工作者类型 BaseWorker用户可以基于这些基础类型来实现自定义的任务和工作者。需要在 BaseTask结构上继承一个新的结构并实现Process方法实现了一个泛型任务管理器 Manager用于管理工作者并分配任务给工作者。根据最大工作者个数和任务队列长度动态地添加工作者。提供了停止所有工作者的方法提供了方法来等待所有工作者完成任务。
3.类型和接口
3.1Task 任务接口
type Task interface {Process()
}任务接口定义了一个 Process() 方法用于执行任务的处理逻辑。
3.2Worker 接口
type Worker interface {Init(id int64, taskChan chan Task, wg *sync.WaitGroup, f WorkerCleanF)Start()Stop()
}工作者接口定义了三个方法
Init() 方法用于初始化工作者。创建后设置工作者ID任务通道同步组以及一个析构函数类似的清理函数Start() 方法用于启动工作者协程开始处理任务Stop() 方法用于停止工作者关闭通道
3.3BaseTask 结构体
这是一个最基础的示例后续自定义结构可以包含这个结构
type BaseTask struct {Id int64
}基础任务结构体包含一个任务 ID实现了 Task 接口的 Process() 方法用于执行任务的处理逻辑。
3.4BaseWorker 结构体
type BaseWorker struct {Id int64waitGrp *sync.WaitGrouptaskChan chan TaskcleanFun WorkerCleanFquitChan chan struct{}
}基础工作者结构体包含工作者 ID、等待组、任务通道、清理函数和退出通道实现了 Worker 接口的 Init()、Start() 和 Stop() 方法用于初始化工作者、启动工作者和停止工作者。
4. Manager 结构体
type Manager[T Task, W Worker] struct {workers map[int64]W // 使用一个map管理各个协程maxWorkers int64 // 最大协程数量workerCounter int64 // 使用原子方式计数taskChan chan Task // 任务通道lock sync.Mutex // map用的锁wg sync.WaitGroup // 同步组newWorkerFunc func() W // 用于创建泛型中工作者结构的函数exiting int32 // 退出状态标记防止停止过程中加入任务workerIdSeq int64 // 协程序号可以用雪花算法代替一般应该够用
}任务管理器结构体包含了一个工作者映射、最大工作者数量、工作者计数器、任务通道、互斥锁、等待组、新建工作者函数、退出标志和工作者 ID 序列提供了方法来添加任务、移除工作者、等待所有工作者完成任务和停止所有工作者。
5. 使用示例
最简单的一个测试示例 manager : NewManager[Task, *BaseWorker](20, NewBaseWorker)// 添加示例任务到管理器go func() {for i : 0; i 10; i {var t BaseTask{Id: int64(i)}manager.AddTask(t)}}()time.Sleep(time.Minute * 1)manager.StopAll()// 等待所有工作者完成任务manager.Wait()我们需要重新定义一个结构用于表示任务通常需要更多的字段
type CustomTask struct {BaseTaskAdditionalInfo string// 这里添加更多的字段
}// 实现 Task 接口的 Run 方法
// 必须要实现这个函数这是任务调度的功能入口在协程中运行
func (t *CustomTask) Process() {fmt.Printf(CustomTask with additional info %s is running\n, t.AdditionalInfo)// 调用父类的 Process 方法//t.BaseTask.Process()
}
重写测试 func TestWorkers(t *testing.T) {manager : NewManager[Task, *BaseWorker](20, NewBaseWorker)// 添加示例工作者到管理器// 添加示例任务到管理器go func() {for i : 0; i 10; i {var t BaseTask{Id: int64(i)}manager.AddTask(t)}for i : 10; i 16; i {var t CustomTask{BaseTask: BaseTask{Id: int64(i)}, AdditionalInfo: Custom Info}manager.AddTask(t)}}()time.Sleep(time.Minute * 1)manager.StopAll()// 等待所有工作者完成任务manager.Wait()
}结论
各个语言实现的这个轮子基本都差不多。