做吃穿住行网站,wordpress 加入js,wordpress添加单页,python制作网页教程Etcd 相关参考资料
Etcd 的介绍与使用#xff1a;Etcd 介绍与使用#xff08;入门篇#xff09;-CSDN博客
Etcd Raft 协议#xff1a;Etcd Raft 协议#xff08;进阶篇#xff09;-CSDN博客
本文诣在使用 Go 客户端操作 Etcd#xff0c;并实现元数据的写入#xff0…Etcd 相关参考资料
Etcd 的介绍与使用Etcd 介绍与使用入门篇-CSDN博客
Etcd Raft 协议Etcd Raft 协议进阶篇-CSDN博客
本文诣在使用 Go 客户端操作 Etcd并实现元数据的写入单条写、批量写、读取单挑读、前缀读、监听watch、更新update
注数据是以 Protobuf 格式存储到 Etcd 的
Protobuf 的介绍与使用Protobuf 的介绍与使用入门级-CSDN博客
完整代码
./src/message/metadata.proto
syntax proto3;
package message;option go_package /data/etcd_test/src/message;message Metadata {string Name 1;int64 ShardCount 2;
} ./src/etcd/main.go
package mainimport (errorsstrings//encoding/jsonetcd_test/src/messagefmtgithub.com/golang/protobuf/protogo.etcd.io/etcd/client/v3golang.org/x/net/contexttime
)// 连接etcd集群
func connectToEtcd(args ...string) (*clientv3.Client, error) {if len(args) 0 {return nil, errors.New(ip or port is nil)}var endpoints []stringfor _, arg : range args {endpoints append(endpoints, arg)}// 连接etcd集群cli, err : clientv3.New(clientv3.Config{Endpoints: endpoints,DialTimeout: 5 * time.Second,})if err ! nil {return nil, errors.New(connect etcd fail)}return cli, nil
}// 初始化元数据
func initMetadata() (map[string]*message.Metadata, error) {// 初始化一个切片用于存储元数据metadata : make(map[string]*message.Metadata)// 初始化meta_tables实例data1 : message.Metadata{Name: dist_by_mm, ShardCount: 12}keyName1 : meta_tablemetadata[keyName1] data1data2 : message.Metadata{Name: dist_by_mm, ShardCount: 14}keyName2 : meta_table1metadata[keyName2] data2if len(metadata) 0 {return nil, errors.New(metadata init error)}return metadata, nil
}// 将元数据序列化为protobuf并存储到etcd(单条循环写入)
func putMetadataToEtcdSingle(cli *clientv3.Client, metadata map[string]*message.Metadata) error {if len(metadata) 0 {return errors.New(metadata list is nil)}// 遍历元数据列表逐个写入etcdfor k, v : range metadata {// 将元数据序列化为protobufprotoData, err : proto.Marshal(v)if err ! nil {return errors.New(metadata to protobuf fail)}// 存储protobuf数据到etcdctx, cancel : context.WithTimeout(context.Background(), 5*time.Second)defer cancel()_, err cli.Put(ctx, k, string(protoData))if err ! nil {return errors.New(metadata write fail)}}return nil
}// 将元数据序列化为protobuf并存储到etcd(批量写入)
func putMetadataToEtcdBatch(cli *clientv3.Client, metadata map[string]*message.Metadata) error {// 创建事务txn : cli.Txn(context.Background())meta : make(map[string]string)for k, v : range metadata {// 将元数据序列化为protobufprotoData, err : proto.Marshal(v)if err ! nil {return errors.New(metadata to protobuf fail)}meta[k] string(protoData)}// 组装多个写入操作txn.If(clientv3.Compare(clientv3.CreateRevision(meta_table), , 0)).// 此处不允许相同的键重复写入Then(clientv3.OpPut(meta_table, meta[meta_table]),clientv3.OpPut(meta_table1, meta[meta_table1]),)// 提交事务resp, err : txn.Commit()if err ! nil {// handle error!fmt.Printf(txn commit failed, err: %v\n, err)return nil}// 检查事务结果if !resp.Succeeded {fmt.Println(txn failed, some keys already exist)} else {fmt.Println(txn succeeded)}return nil
}func getMetadataFromEtcd(cli *clientv3.Client, metadataMap map[string]*message.Metadata, args ...interface{}) error {if len(args) 0 {return errors.New(not key need read)}// 遍历key并从etcd中读取对应的key-valfor _, arg : range args {switch v : arg.(type) {case map[string]*message.Metadata: // 以map的形式指定key(即读取刚写入的数据){// map为空if len(v) 0 {return errors.New(key map is nil)} else {for k, _ : range v {ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second)meta, err : cli.Get(ctx, k)cancel()if err ! nil {return errors.New(read matadata fail)}if len(meta.Kvs) 0 {return errors.New(no key-value found for the given key)}// 把protobuf解析为Metadatavar retrievedMetadata message.Metadataerr proto.Unmarshal(meta.Kvs[0].Value, retrievedMetadata)if err ! nil {return errors.New(protobuf to Metadata fail)}metadataMap[k] retrievedMetadata}}break}case string: // 单个读或批量读{if strings.Contains(v, prefix) { // prefix读str : strings.Split(v, :)key : str[0]// 创建事务txn : cli.Txn(context.Background())ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second)meta, err : cli.Get(ctx, key, clientv3.WithPrefix())cancel()if err ! nil {return errors.New(read matadata error)}if len(meta.Kvs) 0 {return errors.New(no key-value found for the given key)}for _, m : range meta.Kvs {// 把protobuf解析为Metadatavar val message.Metadataerr proto.Unmarshal(m.Value, val)if err ! nil {return errors.New(protobuf to Metadata fail)}metadataMap[string(m.Key)] val}// 提交事务resp, err : txn.Commit()if err ! nil {return errors.New(txn commit failed)}// 检查事务结果if !resp.Succeeded {return errors.New(txn failed, some keys already exist)} else {return errors.New(txn succeeded)}} else { // 单条读ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second)meta, err : cli.Get(ctx, v)cancel()if err ! nil {return errors.New(read matadata error)}if len(meta.Kvs) 0 {return errors.New(no key-value found for the given key)}var val message.Metadataerr proto.Unmarshal(meta.Kvs[0].Value, val)if err ! nil {return errors.New(protobuf to Metadata fail)}metadataMap[string(meta.Kvs[0].Key)] val}break}default:return errors.New(args is unknown data type)}}return nil
}// 监听etcd上指定key的变化如果发生变化则更新缓存中的元数据
func watchEtcdAndUpdate(cli *clientv3.Client, metadataMap map[string]*message.Metadata, key string) error {if key {return errors.New(not key need watch)}ctx, cancel : context.WithCancel(context.Background())defer cancel()watch : cli.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV())for w : range watch {for _, ev : range w.Events {fmt.Printf(Type: %s Key: %s Value: %s\n, ev.Type, ev.Kv.Key, ev.Kv.Value)// 根据变化修改缓存中的元数据switch ev.Type {case clientv3.EventTypePut:{var val message.Metadataerr : proto.Unmarshal(ev.Kv.Value, val)if err ! nil {return errors.New(protobuf to Metadata fail)}metadataMap[string(ev.Kv.Key)] valfor k, v : range metadataMap {fmt.Println(tableName:, k)fmt.Println(metadata ShardCount:, v.GetShardCount())}break}case clientv3.EventTypeDelete:{if ev.PrevKv.Key ! nil {delete(metadataMap, string(ev.Kv.Key))}break}default:return errors.New(watch error)}}}return nil
}func main() {// 初始化一个缓存模拟其它kingproxy节点metadataMap : make(map[string]*message.Metadata)// 连接etcd集群cli, err : connectToEtcd(120.92.144.250:2379)if err ! nil {fmt.Println(err)return}defer cli.Close()// 初始化元数据metadata, err : initMetadata()if err ! nil {fmt.Println(err)return}// 写入元数据//err putMetadataToEtcdSingle(cli, metadata)//if err ! nil {// fmt.Println(err)// return//}// 批量写入元数据err putMetadataToEtcdBatch(cli, metadata)if err ! nil {fmt.Println(err)return}// 读取元数据getMetadataFromEtcd(cli, metadataMap, meta_table:prefix)// pre resultfor k, v : range metadataMap {fmt.Println(tableName:, k)fmt.Println(metadata ShardCount:, v.GetShardCount())}fmt.Println(--------------------------------------------------------------------------------)// watch key//watchEtcdAndUpdate(cli, metadataMap, meta_table)return
}
完整 Go 工程
完整 Go 工程获取https://download.csdn.net/download/weixin_47156401/89019290