网站维护升级,torrentkitty磁力天堂,网页的框架结构图,携程旅行网站建设分析前言
到目前为止我们的框架已经有了一部分服务治理的功能#xff0c;这次我们在之前的基础上实现一些其他功能。篇幅所限这里只列举部分实现
zookeeper注册中心
实现我们之前的注册中心的接口即可#xff0c;这里使用了docker的libkv而不是直接用zk客户端#xff08;从rp…前言
到目前为止我们的框架已经有了一部分服务治理的功能这次我们在之前的基础上实现一些其他功能。篇幅所限这里只列举部分实现
zookeeper注册中心
实现我们之前的注册中心的接口即可这里使用了docker的libkv而不是直接用zk客户端从rpcx那学的libkv封装了对于几种存储服务的操作包括Consul、Etcd、Zookeeper和BoltDB后续如果要支持其他类型的存储就得自己写客户端了。基于zk的注册中心的定义如下
type ZookeeperRegistry struct {AppKey string //一个ZookeeperRegistry实例和一个appkey关联ServicePath string //数据存储的基本路径位置比如/service/providersUpdateInterval time.Duration //定时拉取数据的时间间隔kv store.Store //封装过的zk客户端providersMu sync.RWMutexproviders []registry.Provider //本地缓存的列表watchersMu sync.Mutexwatchers []*Watcher //watcher列表
}
初始化部分逻辑如下
func NewZookeeperRegistry(AppKey string, ServicePath string, zkAddrs []string,updateInterval time.Duration, cfg *store.Config) registry.Registry {zk : new(ZookeeperRegistry)zk.AppKey AppKeyzk.ServicePath ServicePathzk.UpdateInterval updateIntervalkv, err : libkv.NewStore(store.ZK, zkAddrs, cfg)if err ! nil {log.Fatalf(cannot create zk registry: %v, err)}zk.kv kvbasePath : zk.ServicePathif basePath[0] / { //路径不能以/开头basePath basePath[1:]zk.ServicePath basePath}//先创建基本路径err zk.kv.Put(basePath, []byte(base path), store.WriteOptions{IsDir: true})if err ! nil {log.Fatalf(cannot create zk path %s: %v, zk.ServicePath, err)}//显式拉取第一次数据zk.doGetServiceList()go func() {t : time.NewTicker(updateInterval)for range t.C {//定时拉取数据zk.doGetServiceList()}}()go func() {//后台watch数据zk.watch()}()return zk
}
我们在初始化注册中心时执行两个后台任务定时拉取和监听数据相当于推拉结合的方式。同时监听获得的数据是全量数据因为实现起来简单一些后续如果服务列表越来越大时可能需要加上基于版本号的机制或者只传输增量数据。这里额外指出几个要点
后台定时拉取数据并缓存起来查询时直接返回缓存注册时在zk添加节点注销时在zk删除节点监听时并不监听每个服务提供者而是监听其父级目录有变更时再统一拉取服务提供者列表这样可以减少watcher的数目逻辑也更简单一些因为第4点所以注册和注销时需要更改父级目录的内容lastUpdate来触发监听
具体的注册注销逻辑这里不再列举参考github
客户端心跳
如果我们使用zk作为注册中心更简单的做法可能是直接将服务提供者作为临时节点添加到zk上这样就可以利用临时节点的特性实现动态的服务发现。但是我们使用的libkv库并不支持临时节点的功能而且除了zk其他存储服务比如etcd等可能也不支持临时节点的特性所以我们注册到注册中心的都是持久节点。在这种情况下可能某些由于特殊情况无法访问的服务提供者并没有及时地将自身从注册中心注销掉所以客户端需要额外的能力来判断一个服务提供者是否可用而不是完全依赖注册中心。 所以我们需要增加客户端心跳的支持客户端可以定时向服务端发送心跳请求服务端收到心跳请求时可以直接返回只要通知客户端自身仍然可用就行。客户端可以根据设置的阈值对心跳失败的服务提供者进行降级处理直到心跳恢复或者服务提供者被注销掉。客户端发送心跳逻辑如下
func (c *sgClient) heartbeat() {if c.option.HeartbeatInterval 0 {return}//根据指定的时间间隔发送心跳t : time.NewTicker(c.option.HeartbeatInterval)for range t.C {if c.shutdown {t.Stop()return}//遍历每个RPCClient进行心跳检查c.clients.Range(func(k, v interface{}) bool {err : v.(RPCClient).Call(context.Background(), , , nil)c.mu.Lock()if err ! nil {//心跳失败进行计数if fail, ok : c.clientsHeartbeatFail[k.(string)]; ok {failc.clientsHeartbeatFail[k.(string)] fail} else {c.clientsHeartbeatFail[k.(string)] 1}} else {//心跳成功则进行恢复c.clientsHeartbeatFail[k.(string)] 0c.serversMu.Lock()for i, p : range c.servers {if p.ProviderKey k {delete(c.servers[i].Meta, protocol.ProviderDegradeKey)}}c.serversMu.Unlock()}c.mu.Unlock()//心跳失败次数超过阈值则进行降级if c.clientsHeartbeatFail[k.(string)] c.option.HeartbeatDegradeThreshold {c.serversMu.Lock()for i, p : range c.servers {if p.ProviderKey k {c.servers[i].Meta[protocol.ProviderDegradeKey] true}}c.serversMu.Unlock()}return true})}
}鉴权
鉴权的实现比较简单客户端可以在元数据中携带鉴权相关的信息而服务端可以通过指定的Wrapper进行鉴权。服务端Wrapper的代码如下
type AuthFunc func(key string) bool
type ServerAuthInterceptor struct {authFunc AuthFunc
}
func NewAuthInterceptor(authFunc AuthFunc) Wrapper {return ServerAuthInterceptor{authFunc}
}
func (sai *ServerAuthInterceptor) WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc {return func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) {if auth, ok : ctx.Value(protocol.AuthKey).(string); ok {//鉴权通过则执行业务逻辑if sai.authFunc(auth) {requestFunc(ctx, response, response, tr)return}}//鉴权失败则返回异常s.writeErrorResponse(response, tr, auth failed)}
}熔断降级
暂时实现了简单的基于时间窗口的熔断器实现如下
type CircuitBreaker interface {AllowRequest() boolSuccess()Fail(err error)
}
type DefaultCircuitBreaker struct {lastFail time.Timefails uint64threshold uint64window time.Duration
}
func NewDefaultCircuitBreaker(threshold uint64, window time.Duration) *DefaultCircuitBreaker {return DefaultCircuitBreaker{threshold: threshold,window: window,}
}
func (cb *DefaultCircuitBreaker) AllowRequest() bool {if time.Since(cb.lastFail) cb.window {cb.reset()return true}failures : atomic.LoadUint64(cb.fails)return failures cb.threshold
}
func (cb *DefaultCircuitBreaker) Success() {cb.reset()
}
func (cb *DefaultCircuitBreaker) Fail() {atomic.AddUint64(cb.fails, 1)cb.lastFail time.Now()
}
func (cb *DefaultCircuitBreaker) reset() {atomic.StoreUint64(cb.fails, 0)cb.lastFail time.Now()
}
结语
这次的内容就到此为止有任何意见或者建议欢迎指正。