dede电影网站模版,html网页案例,如何为wordpress加评论,大桥外语官方网站星做宝贝0.前言
这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体#xff09;这种就解决不了#xff0c;也即是没有服务ip地址和服务实例的映射关系。
1.为什么需要注册中心
在上一节中#xff0c;客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。…0.前言
这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体这种就解决不了也即是没有服务ip地址和服务实例的映射关系。
1.为什么需要注册中心
在上一节中客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。这时可能会出问题要是该服务实例ip改变了呢该服务实例下线宕机了呢这时如何是好。
// 调用单个服务实例
func clientCall(addr1, addr2 string) {d : xclient.NewMultiServerDiscovery([]string{tcp addr1, tcp addr2})xc : xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()//省略其他......
}
这时注册中心的重要性就出来了。
注册中心主要有三种角色
服务提供者RPC Server在启动时向 Registry 注册自身服务并向 Registry 定期发送心跳汇报存活状态。服务消费者RPC Client在启动时向 Registry 订阅服务把 Registry 返回的服务节点列表缓存在本地内存中并与 RPC Sever 建立连接。服务注册中心Registry用于保存 RPC Server 的注册信息当 RPC Server 节点发生变更时Registry 会同步变更RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。
最后RPC Client 从本地缓存的服务节点列表中基于负载均衡算法选择一台 RPC Sever 发起调用。 当然注册中心的功能还有很多比如配置的动态同步、通知机制等。比较常用的注册中心有 etcd、zookeeper、consul一般比较出名的微服务或者 RPC 框架这些主流的注册中心都是支持的。
2.Gee Registry
主流的注册中心 etcd、zookeeper 等功能强大与这类注册中心的对接代码量是比较大的需要实现的接口也很多。所以这里我们选择自己实现一个简单的支持心跳保活的注册中心。
GeeRegistry 的代码独立放置在子目录 registry 中。
首先定义 GeeRegistry 结构体默认超时时间设置为 5 min也就是说超过5min没有收到该注册的服务的心跳即视其为不可用状态。
//registry.go
type ServerItem struct {Addr stringstart time.Time //用于心跳时间计算
}// GeeRegistry is a simple register center
type GeeRegistry struct {timeout time.Durationmutex sync.Mutex //protcect serversservers map[string]*ServerItem
}const (defaultPath /_rpc_/registrydefaultTimeout time.Minute * 5
)func New(timeout time.Duration) *GeeRegistry {return GeeRegistry{servers: make(map[string]*ServerItem),timeout: timeout,}
}var DefalultGeeRegister New(defaultTimeout)
然后为 GeeRegistry 实现添加服务实例和返回服务列表的方法。
putServer添加服务实例如果服务已经存在则更新 start。aliveServers返回可用的服务列表如果存在超时的服务则删除。
func (r *GeeRegistry) putServer(addr string) {r.mutex.Lock()defer r.mutex.Unlock()s : r.servers[addr]if s nil {r.servers[addr] ServerItem{Addr: addr, start: time.Now()}} else {s.start time.Now() // if exists, update start time to keep alive}
}func (r *GeeRegistry) aliveServers() []string {r.mutex.Lock()defer r.mutex.Unlock()var alive []stringfor addr, s : range r.servers {if r.timeout 0 || s.start.Add(r.timeout).After(time.Now()) {alive append(alive, addr)} else {delete(r.servers, addr)}}sort.Strings(alive)return alive
} 为了简单那么rpc客户端通过HTTP去访问注册中心且所有的有用信息都承载在 HTTP Header 中。
Get返回所有可用的服务列表通过自定义字段 X-rpc-Servers 承载。Post添加服务实例或发送心跳通过自定义字段 X-rpc-Server 承载。
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {switch req.Method {case GET:w.Header().Set(X-rpc-Servers, strings.Join(r.aliveServers(), ,))case POST:addr : req.Header.Get(X-rpc-Servers)if addr {w.WriteHeader(http.StatusInternalServerError)return}r.putServer(addr) //更新保存在注册中心的服务实例default:w.WriteHeader(http.StatusMethodNotAllowed)}
}func (r *GeeRegistry) HandleHTTP(registryPath string) {http.Handle(registryPath, r)
}func HandleHTTP() {DefalultGeeRegister.HandleHTTP(defaultPath)
}
另外也要提供 Heartbeat 方法便于服务启动时定时向注册中心发送心跳也是通过HTTP默认周期比注册中心设置的过期时间少 1 min。
// only send once
func sendHeartbeat(registryURL, addr string) error {httpClient : http.Client{Timeout: time.Second * 10}req, _ : http.NewRequest(POST, registryURL, nil)req.Header.Set(X-rpc-Servers, addr)resp, err : httpClient.Do(req)if err ! nil {fmt.Println(rpc server: heart beat err:, err)return err}defer resp.Body.Close()return nil
}// Heartbeat send a heartbeat message every once in a while
func Heartbeat(registryURL, addr string, duration time.Duration) {if duration 0 {duration defaultTimeout - time.Duration(1)*time.Minute}err : sendHeartbeat(registryURL, addr)go func() {//创建一个定时器t : time.NewTicker(duration)for err nil {-t.Cerr sendHeartbeat(registryURL, addr)}}()
}
3.需要注册中心的服务发现
上一节我们实现了一个不需要注册中心服务列表由手工维护的服务发现的结构体MultiServersDiscovery。
而现在我们实现了注册中心那这一节的服务发现就可以继承上一节的并添加与注册中心相关的细节。
type GeeRegistryDiscovery struct {*MultiServerDiscoveryregistryAddr stringtimeout time.Duration //服务列表的过期时间lastUpdate time.Time
}const defaultUpdateTimeout time.Second * 10func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {if timeout 0 {timeout defaultUpdateTimeout}return GeeRegistryDiscovery{MultiServerDiscovery: NewMultiServerDiscovery(make([]string, 0)),registryAddr: registerAddr,timeout: timeout,}
}
GeeRegistryDiscovery 嵌套了 MultiServersDiscovery很多能力可以复用。registryAddr 即注册中心的地址timeout 服务列表的过期时间lastUpdate 是代表最后从注册中心更新服务列表的时间默认 10s 过期即 10s 之后需要从注册中心更新新的列表。
实现 Update 和 Refresh 方法超时重新获取的逻辑在 Refresh 中实现
func (d *GeeRegistryDiscovery) Update(servers []string) error {d.rwMutex.Lock()defer d.rwMutex.Unlock()d.servers serversd.lastUpdate time.Now()return nil
}// 刷新有了注册中心在客户端每次获取服务实例时候需要刷新注册中心的保存的服务实例
func (d *GeeRegistryDiscovery) Refresh() error {d.rwMutex.Lock()defer d.rwMutex.Unlock()//注册中心保存的服务实例还没超时不用更新if d.lastUpdate.Add(d.timeout).After(time.Now()) {return nil}httpClient : http.Client{Timeout: time.Second * 10} //http客户端最好有个超时resp, err : httpClient.Get(d.registryAddr)if err ! nil {fmt.Println(rpc registry refresh err:, err)return err}defer resp.Body.Close()servers : strings.Split(resp.Header.Get(X-rpc-Servers), ,)d.servers make([]string, 0, len(servers))for _, server : range servers {//返回一个string类型并将最前面和最后面的ASCII定义的空格去掉中间的空格不会去掉s : strings.TrimSpace(server)if s ! {d.servers append(d.servers, s)}}d.lastUpdate time.Now()return nil
} Get 和 GetAll 与 MultiServersDiscovery 相似唯一的不同在于GeeRegistryDiscovery 需要先调用 Refresh 确保服务列表没有过期。
func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {if err : d.Refresh(); err ! nil {return , err}//d.Get(mode) 表示调用的是(GeeRegistryDiscovery).Getreturn d.MultiServerDiscovery.Get(mode) //d.MultiServerDiscovery是调用MultiServerDiscovery的Get()
}func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {if err : d.Refresh(); err ! nil {return nil, err}return d.MultiServerDiscovery.GetAll()
}
4.测试
添加函数 startRegistry之后需要稍微修改 startServer定期向注册中心发送心跳保活(Heartbeat)。
这里使用sync.WaitGroup是为了等待该操作执行完毕才会往后执行因为这些函数都是新开协程运行。
func startServer(registryAddr string, wg *sync.WaitGroup) {var myServie Myl, _ : net.Listen(tcp, localhost:0) //端口是0表示端口随机server : geerpc.NewServer()//这里一定要用myServie因为前面Sum方法的接受者是*My;若接受者是My,myServie或者myServie都可以server.Register(myServie)registry.Heartbeat(registryAddr, tcpl.Addr().String(), 0) //定时发送心跳wg.Done()server.Accept(l)
}func startRegistry(wg *sync.WaitGroup) {l, _ : net.Listen(tcp, localhost:9999)registry.HandleHTTP()wg.Done()http.Serve(l, nil)
}
接下来将 call 和 broadcast 的 MultiServersDiscovery 替换为 GeeRegistryDiscovery不再需要硬编码服务列表。
这里就重点对比下NewGeeRegistryDiscovery方法和之前的不同之处。
// 调用单个服务实例
func clientCall(registryAddr string) {// d : xclient.NewMultiServerDiscovery([]string{tcp addr1, tcp addr2})d : xclient.NewGeeRegistryDiscovery(registryAddr, 0)xc : xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i : 0; i 5; i {wg.Add(1)go func(i int) {defer wg.Done()var reply int 1324if err : xc.Call(context.Background(), My.Sum, Args{Num1: i, Num2: i * i}, reply); err ! nil {log.Println(call Foo.Sum error:, err)}fmt.Println(reply: , reply)}(i)}wg.Wait()
}func broadcast(registryAddr string) {// d : xclient.NewMultiServerDiscovery([]string{tcp addr1, tcp addr2})d : xclient.NewGeeRegistryDiscovery(registryAddr, 0)xc : xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i : 0; i 5; i {wg.Add(1)go func(i int) {defer wg.Done()var reply int 1324if err : xc.Broadcast(context.Background(), My.Sum, Args{Num1: i, Num2: i * i}, reply); err ! nil {fmt.Println(Broadcast call Foo.Sum error:, err)}fmt.Println(Broadcast reply: , reply)ctx, cancel : context.WithTimeout(context.Background(), time.Second*2)defer cancel()var replyTimeout int 1324if err : xc.Broadcast(ctx, My.Sleep, Args{Num1: i, Num2: i * i}, replyTimeout); err ! nil {fmt.Println(Broadcast call Foo.Sum error:, err)}fmt.Println(timeout Broadcast reply: , replyTimeout)}(i)}wg.Wait()
}
最后是main函数。
确保注册中心启动后再启动 RPC 服务端最后客户端远程调用。
func main() {registryAddr : http://localhost:9999/_rpc_/registryvar wg sync.WaitGroupwg.Add(1)go startRegistry(wg) //开启注册中心服务wg.Wait()time.Sleep(time.Second)wg.Add(2)go startServer(registryAddr, wg)go startServer(registryAddr, wg)wg.Wait()time.Sleep(time.Second)clientCall(registryAddr)broadcast(registryAddr)
}
运行结果 代码 https://github.com/liwook/Go-projects/tree/main/geerpc/7-registry