做绿植o2o网站怎么样,希音跨境电商官网,做图库网站用什么系统软件,广东建数网络科技有限公司早在2013年11月份#xff0c;在raft论文还只能在网上下载到草稿版时#xff0c;我曾经写过一篇blog对其进行简要分析。4年过去了#xff0c;各种raft协议的讲解铺天盖地#xff0c;raft也确实得到了广泛的应用。其中最知名的应用莫过于etcd。etcd将raft协议本身实现为一个l…早在2013年11月份在raft论文还只能在网上下载到草稿版时我曾经写过一篇blog对其进行简要分析。4年过去了各种raft协议的讲解铺天盖地raft也确实得到了广泛的应用。其中最知名的应用莫过于etcd。etcd将raft协议本身实现为一个library位于https://github.com/coreos/etcd/tree/master/raft然后本身作为一个应用使用它。 本文不讲解raft协议核心内容而是站在一个etcd raft library使用者的角度讲解要用上这个library需要了解的东西。 这个library使用起来相对来说还是有点麻烦。官方有一个使用示例在 https://github.com/coreos/etcd/tree/master/contrib/raftexample。整体来说这个库实现了raft协议核心的内容比如append log的逻辑选主逻辑snapshot成员变更等逻辑。需要明确的是library没有实现消息的网络传输和接收库只会把一些待发送的消息保存在内存中用户自定义的网络传输层取出消息并发送出去并且在网络接收端需要调一个library的函数用于将收到的消息传入library后面会详细说明。同时library定义了一个Storage接口需要library的使用者自行实现。 Storage接口如下: // Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {// InitialState returns the saved HardState and ConfState information.InitialState() (pb.HardState, pb.ConfState, error)// Entries returns a slice of log entries in the range [lo,hi).// MaxSize limits the total size of the log entries returned, but// Entries returns at least one entry if any.Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)// Term returns the term of entry i, which must be in the range// [FirstIndex()-1, LastIndex()]. The term of the entry before// FirstIndex is retained for matching purposes even though the// rest of that entry may not be available.Term(i uint64) (uint64, error)// LastIndex returns the index of the last entry in the log.LastIndex() (uint64, error)// FirstIndex returns the index of the first log entry that is// possibly available via Entries (older entries have been incorporated// into the latest Snapshot; if storage only contains the dummy entry the// first log entry is not available).FirstIndex() (uint64, error)// Snapshot returns the most recent snapshot.// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,// so raft state machine could know that Storage needs some time to prepare// snapshot and call Snapshot later.Snapshot() (pb.Snapshot, error)
} 这些接口在library中会被用到。熟悉raft协议的人不难理解。上面提到的官方示例https://github.com/coreos/etcd/tree/master/contrib/raftexample中使用了library自带的MemoryStorage和etcd的wal和snap包做持久化重启的时候从wal和snap中获取日志恢复MemoryStorage。 要提供这种IO/网络密集型的东西提高吞吐最好的手段就是batch加批处理了。etcd raft library正是这么做的。 下面看一下为了做这事etcd提供的核心抽象Ready结构体 // Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {// The current volatile state of a Node.// SoftState will be nil if there is no update.// It is not required to consume or store SoftState.*SoftState// The current state of a Node to be saved to stable storage BEFORE// Messages are sent.// HardState will be equal to empty state if there is no update.pb.HardState// ReadStates can be used for node to serve linearizable read requests locally// when its applied index is greater than the index in ReadState.// Note that the readState will be returned when raft receives msgReadIndex.// The returned is only valid for the request that requested to read.ReadStates []ReadState// Entries specifies entries to be saved to stable storage BEFORE// Messages are sent.Entries []pb.Entry// Snapshot specifies the snapshot to be saved to stable storage.Snapshot pb.Snapshot// CommittedEntries specifies entries to be committed to a// store/state-machine. These have previously been committed to stable// store.CommittedEntries []pb.Entry// Messages specifies outbound messages to be sent AFTER Entries are// committed to stable storage.// If it contains a MsgSnap message, the application MUST report back to raft// when the snapshot has been received or has failed by calling ReportSnapshot.Messages []pb.Message// MustSync indicates whether the HardState and Entries must be synchronously// written to disk or if an asynchronous write is permissible.MustSync bool
} 可以说这个Ready结构体封装了一批更新这些更新包括 pb.HardState: 包含当前节点见过的最大的term以及在这个term给谁投过票已经当前节点知道的commit indexMessages: 需要广播给所有peers的消息CommittedEntries:已经commit了还没有apply到状态机的日志Snapshot:需要持久化的快照库的使用者从node结构体提供的一个ready channel中不断的pop出一个个的Ready进行处理库使用者通过如下方法拿到Ready channel func (n *node) Ready() -chan Ready { return n.readyc } 应用需要对Ready的处理包括: 将HardState, Entries, Snapshot持久化到storage。将Messages(上文提到的msgs)非阻塞的广播给其他peers将CommittedEntries(已经commit还没有apply)应用到状态机。如果发现CommittedEntries中有成员变更类型的entry调用node的ApplyConfChange()方法让node知道(这里和raft论文不一样论文中只要节点收到了成员变更日志就应用)调用Node.Advance()告诉raft node这批状态更新处理完了状态已经演进了可以给我下一批Ready让我处理。应用通过raft.StartNode()来启动raft中的一个副本函数内部通过启动一个goroutine运行 func (n *node) run(r *raft) 来启动服务。 应用通过调用 func (n *node) Propose(ctx context.Context, data []byte) error 来Propose一个请求给raft被raft开始处理后返回。 增删节点通过调用 func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error node结构体包含几个重要的channel: // node is the canonical implementation of the Node interface
type node struct {propc chan pb.Messagerecvc chan pb.Messageconfc chan pb.ConfChangeconfstatec chan pb.ConfStatereadyc chan Readyadvancec chan struct{}tickc chan struct{}done chan struct{}stop chan struct{}status chan chan Statuslogger Logger
} propc: propc是一个没有buffer的channel应用通过Propose接口写入的请求被封装成Message被push到propc中node的run方法从propc中pop出Messageappend自己的raft log中并且将Message放入mailbox中(raft结构体中的msgs []pb.Message)这个msgs会被封装在Ready中被应用从readyc中取出来然后通过应用自定义的transport发送出去。recvc: 应用自定义的transport在收到Message后需要调用 func (n *node) Step(ctx context.Context, m pb.Message) error 来把Message放入recvc中经过一些处理后同样会把需要发送的Message放入到对应peers的mailbox中。后续通过自定义transport发送出去。readycadvancec: readyc和advancec都是没有buffer的channelnode.run()内部把相关的一些状态更新打包成Ready结构体(其中一种状态就是上面提到的msgs)放入readyc中。应用从readyc中pop出Ready中对相应的状态进行处理处理完成后调用 rc.node.Advance() 往advancec中push一个空结构体告诉raft已经对这批Ready包含的状态进行了相应的处理node.run()内部从advancec中得到通知后对内部一些状态进行处理比如把已经持久化到storage中的entries从内存(对应type unstable struct)中删除等。tickc:应用定期往tickc中push空结构体node.run()会调用tick()函数对于leader来说tick()会给其他peers发心跳对于follower来说会检查是否需要发起选主操作。confc/confstatec:应用从Ready中拿出CommittedEntries检查其如果含有成员变更类型的日志则需要调用 func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState 这个函数会push ConfChange到confc中confc同样是个无buffer的channelnode.run()内部会从confc中拿出ConfChange然后进行真正的增减peers操作之后将最新的成员组push到confstatec中而ApplyConfChange函数从confstatec pop出最新的成员组返回给应用。可以说要想用上etcd的raft library还是需要了解不少东西的。 转载于:https://www.cnblogs.com/foxmailed/p/7137431.html