【存储】etcd(4)-raft
创始人
2025-05-30 00:09:45
0

在前面几篇中,我们介绍了etcd存储相关的内容,包括预写日志、mvcc、事务等,可以认为对etcd单节点的存储有了相对全面的认识。但是etcd是一个基于raft协议实现的cp模型的分布式存储,只了解其状态机的工作原理是不够的。本文我们就来介绍etcd中的raft模块的具体实现。

关于raft协议本身,这里不做介绍,建议直接阅读原论文,这里给出中文翻译版。

最小实现原则

在介绍具体实现之前,我们先介绍一些软件设计上的内容。
etcd raft模块是基于开源的golang raft sdk实现。

该raft sdk基于最小实现原则,只实现了基本的功能,包括leader选举、日志处理、状态变更等逻辑,而raft运行所需要的存储层和传输层则依赖使用方自行实现。

其中存储层定义了storage接口用来管理raft log,同时提供了基本的实现raft.MemoryStorage,该实现是基于内存数组实现的非持久化的存储,在etcd系列的第一篇中提到过。用户也可以自行实现该接口,并作为参数传入。

raft节点间通信则完全依赖使用方实现,raft sdk没有做任何约束。该raft sdk仅通过channel对外输出要通信的消息,并对外提供方法来处理收到的消息。

该实现方式非常对我的胃口。我在工作中提供一些sdk给别的服务使用时,通常都会遵循最小实现原则。sdk中只实现基本的功能逻辑,sdk依赖的其他能力定义好接口,通过参数或者其他的方式进行注入。业务方在使用时,如果某项能力其本身已经具备,则只需要简单适配接口即可;如果不具备,则可以选择我提供相应实现。

相比于大而全的sdk实现方式,遵循最小实现原则的sdk实现方式可能会增加一些理解成本,但是不会引入冗余的依赖。同时,通过不同sdk的组合也可以更加灵活地对外提供丰富能力。

当然凡事不可一概而论,到底哪种方式更好还要看具体的场景。

说完设计原则,接下来会介绍具体的实现。raft sdk中按照分层的方式进行了实现,从底层到高层分别为raft -> rawNode -> node,我们会从底层开始依次介绍。

raft

raft对象是raft sdk的核心实现。其维护了raft节点的所有状态及参数,包括term、index、raft log、vote、peers state(leader对其他节点状态的追踪)、heartbeat、election timeout等raft必要的状态以及其他具体实现中的性能优化相关参数。同时,raft对象也实现了包括状态转换、日志追加、消息处理及发送等所有的raft节点所需要的方法。

raft的属性如下,我们挑选其中几个进行说明。

type raft struct {id uint64Term uint64Vote uint64readStates []ReadState// the lograftLog *raftLogmaxMsgSize         uint64maxUncommittedSize uint64// TODO(tbg): rename to trk.prs tracker.ProgressTrackerstate StateType// isLearner is true if the local raft node is a learner.isLearner boolmsgs []pb.Message// the leader idlead uint64// leadTransferee is id of the leader transfer target when its value is not zero.// Follow the procedure defined in raft thesis 3.10.leadTransferee uint64// Only one conf change may be pending (in the log, but not yet// applied) at a time. This is enforced via pendingConfIndex, which// is set to a value >= the log index of the latest pending// configuration change (if any). Config changes are only allowed to// be proposed if the leader's applied index is greater than this// value.pendingConfIndex uint64// an estimate of the size of the uncommitted tail of the Raft log. Used to// prevent unbounded log growth. Only maintained by the leader. Reset on// term changes.uncommittedSize uint64readOnly *readOnly// number of ticks since it reached last electionTimeout when it is leader// or candidate.// number of ticks since it reached last electionTimeout or received a// valid message from current leader when it is a follower.electionElapsed int// number of ticks since it reached last heartbeatTimeout.// only leader keeps heartbeatElapsed.heartbeatElapsed intcheckQuorum boolpreVote     boolheartbeatTimeout intelectionTimeout  int// randomizedElectionTimeout is a random number between// [electiontimeout, 2 * electiontimeout - 1]. It gets reset// when raft changes its state to follower or candidate.randomizedElectionTimeout intdisableProposalForwarding booltick func()step stepFunclogger Logger// pendingReadIndexMessages is used to store messages of type MsgReadIndex// that can't be answered as new leader didn't committed any log in// current term. Those will be handled as fast as first log is committed in// current term.pendingReadIndexMessages []pb.Message
}
  • raftlog
    raftlog是用来存储日志的部分,其构造如下。日志被追加到raft模块中时首先被会被添加的unstable中,当etcd将unstable中的日志追加至wal中以后,raft会将对应的日志追加到storage中,并从unstable中清除。storage就是第一小节中介绍的raft sdk定义的存储层接口,这里采用了raft.MemoryStorage的实现。
type raftLog struct {// storage contains all stable entries since the last snapshot.storage Storage// unstable contains all unstable entries and snapshot.// they will be saved into storage.unstable unstable// committed is the highest log position that is known to be in// stable storage on a quorum of nodes.committed uint64// applied is the highest log position that the application has// been instructed to apply to its state machine.// Invariant: applied <= committedapplied uint64logger Logger// maxNextCommittedEntsSize is the maximum number aggregate byte size of the// messages returned from calls to nextCommittedEnts.maxNextCommittedEntsSize uint64
}type unstable struct {// the incoming unstable snapshot, if any.snapshot *pb.Snapshot// all entries that have not yet been written to storage.entries []pb.Entryoffset  uint64logger Logger
}
  • maxMsgSize
    批量处理是非常常见的优化手段。raft在日志同步时就采用了批量处理的方式,一条消息携带多条日志。同时为了防止消息过大,设置了maxMsgSize参数。
  • prs
    raft中使用tracker.ProgressTracker来记录follower的状态,包括next index、commited index、active等。单独拆了一个小模块出来。
  • msgs
    raft需要发送的消息,会追加至msgs保存,算是某种程度的异步发送。当上层调用模块空闲时,会主动获取msgs然后进行发送。前面讲了,日志的同步是批量处理。这里msg的处理是异步批量处理。异步批量处理是常见的很有效的性能优化的手段。
  • prevote
    prevote也是实现中的一个优化。
    在raft算法中,follower在变为candidate时,会立刻将自身的term加一并发起选举。如果选举失败则进入election timeout然后重复该过程。正常情况下可以保证一轮选举一定会选出leader。但是在异常情况会存在问题。比如网络分区的情况下,某些节点的term会一直增长。当网络通信恢复时,其term会比leader大,这会导致leadership转移。
    针对上述问题,raft提供了prevote参数。当prevote为true时,选举时并不会直接将term加一,而是先发起prevote。当能拿到大多数选票时再将term加一并发起真正选举。
  • tick和step
    raft节点状态的驱动主要有两个地方,或者说两个方法。这里状态要和raft算法中的状态机区分,是指raft节点本身的状态,包括日志、任期、索引、节点的交互等。刚说了,raft节点的状态驱动有两个地方:一个是本身的计时,是节点自身的状态驱动,随着计时节点会根据角色不同有发送心跳、角色变更、开启选举等不同的行为;另一个是对外暴露的接口,以响应使用方的请求,同样的,针对同一种请求,不同角色的raft节点行为并不相同。抽象出来就是tick和step方法。
    针对类似上面描述的不同角色下行为不同的情况,通常会将接口抽象出来,针对不同的角色或者状态分别实现,在角色变更时设置相应的行为。这也是常见的设计思路。

介绍完属性,接下来再介绍相关的方法。对于方法,同样不会进行非常细节的介绍。因为相关方法里涉及到大量raft算法的逻辑实现,建议还是去看raft算法。我们会简单介绍主要方法的功能,然后关注一些在具体实现上的优化思路。

下面是raft发送消息相关的方法,最底层是send方法。我把send方法的具体实现贴了出来。可以看到,send方法只是将消息追加到msgs列表,以此实现异步批量处理。异步批量处理是常见的优化手段,可以极大的提升系统的吞吐和性能。但是在使用异步处理时必须要有所限制,必须对等待处理的消息的批次进行限制。
在send方法基础上,封装了sendAppend方法、sendHeartbeat方法,分别对指定的节点发送日志追加消息、发送心跳,以及在sendAppend和sendHeartbeat基础上封装广播方法。

func (r *raft) send(m pb.Message) {// 省略了参数校验r.msgs = append(r.msgs, m)
}func (r *raft) sendAppend(to uint64) {}func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {}func (r *raft) sendHeartbeat(to uint64, ctx []byte) {}func (r *raft) bcastAppend() {}func (r *raft) bcastHeartbeat() {}func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {}

下面是状态变化相关的方法。状态变化的方法比较简单,这里不做展开。只是在具体实现时增加了prevote的状态,这个在前面已经提到过。

func (r *raft) becomeFollower(term uint64, lead uint64) {}func (r *raft) becomeCandidate() {}func (r *raft) becomePreCandidate() {}func (r *raft) becomeLeader() {}func (r *raft) hup(t CampaignType) {}func (r *raft) campaign(t CampaignType) {}

下面是状态驱动的方法。前面也提到,raft节点的状态分别受自身的时钟驱动以及外界请求驱动。

时钟驱动来说,leader会在时钟驱动下发送心跳以及检查qurom;follower及candidate则在时钟驱动下进行状态转换并发起选举。同样,raft也实现了不同角色响应外界请求的方法。

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {}// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {}func (r *raft) Step(m pb.Message) error {}func stepLeader(r *raft, m pb.Message) erro {}func stepCandidate(r *raft, m pb.Message) error {}func stepFollower(r *raft, m pb.Message) error {}

RawNode

RawNode是在raft基础上的封装,其中最主要的一点我认为就是ready的封装。

// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {raft       *raftprevSoftSt *SoftStateprevHardSt pb.HardState
}

ready和advance是raft节点和状态机的交互机制。前面多次提到,raft的实现采用了异步批量处理。状态机会主动调用ready方法,获取等待处理的数据,并在处理完成后调用advance方法通知raft节点相应内容已经处理完成。
先看下ready中都包含哪些数据。ready中包含了的数据有:

  • unstable的日志条目,在etcd将其写入wal后,raft才会认为相应的日志为stable;
  • 已经commit但是尚未apply的日志,apply后raft节点会更新applied状态;
  • 待发送的msgs;
  • softstate和hardstate,分别包括raft节点的状态、以及term、index、vote;
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {rd := Ready{Entries:          r.raftLog.unstableEntries(),CommittedEntries: r.raftLog.nextCommittedEnts(),Messages:         r.msgs,}if softSt := r.softState(); !softSt.equal(prevSoftSt) {rd.SoftState = softSt}if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {rd.HardState = hardSt}if r.raftLog.unstable.snapshot != nil {rd.Snapshot = *r.raftLog.unstable.snapshot}if len(r.readStates) != 0 {rd.ReadStates = r.readStates}rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))return rd
}

状态机在相应处理后会调用advance通知raft节点。

func (r *raft) advance(rd Ready) {r.reduceUncommittedSize(rd.CommittedEntries)if newApplied := rd.appliedCursor(); newApplied > 0 {r.raftLog.appliedTo(newApplied)if r.prs.Config.AutoLeave && newApplied >= r.pendingConfIndex && r.state == StateLeader {m, err := confChangeToMsg(nil)if err != nil {panic(err)}if err := r.Step(m); err != nil {r.logger.Debugf("not initiating automatic transition out of joint configuration %s: %v", r.prs.Config, err)} else {r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)}}}if len(rd.Entries) > 0 {e := rd.Entries[len(rd.Entries)-1]if r.id == r.lead {_ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index})}r.raftLog.stableTo(e.Index, e.Term)}if !IsEmptySnap(rd.Snapshot) {r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)}
}

Node

node仅是在rawNode上封装了一些chan用来做交互,不做介绍。

// node is the canonical implementation of the Node interface
type node struct {propc      chan msgWithResultrecvc      chan pb.Messageconfc      chan pb.ConfChangeV2confstatec chan pb.ConfStatereadyc     chan Readyadvancec   chan struct{}tickc      chan struct{}done       chan struct{}stop       chan struct{}status     chan chan Statusrn *RawNode
}

以上即是对raft部分的介绍,主要侧重在raft sdk的代码设计以及性能优化方面。一些技术细节以及连接层等没有提及,后面会再开一篇补充说明。

相关内容

热门资讯

【MySQL】锁 锁 文章目录锁全局锁表级锁表锁元数据锁(MDL)意向锁AUTO-INC锁...
【内网安全】 隧道搭建穿透上线... 文章目录内网穿透-Ngrok-入门-上线1、服务端配置:2、客户端连接服务端ÿ...
GCN的几种模型复现笔记 引言 本篇笔记紧接上文,主要是上一篇看写了快2w字,再去接入代码感觉有点...
数据分页展示逻辑 import java.util.Arrays;import java.util.List;impo...
Redis为什么选择单线程?R... 目录专栏导读一、Redis版本迭代二、Redis4.0之前为什么一直采用单线程?三、R...
【已解决】ERROR: Cou... 正确指令: pip install pyyaml
关于测试,我发现了哪些新大陆 关于测试 平常也只是听说过一些关于测试的术语,但并没有使用过测试工具。偶然看到编程老师...
Lock 接口解读 前置知识点Synchronized synchronized 是 Java 中的关键字,...
Win7 专业版安装中文包、汉... 参考资料:http://www.metsky.com/archives/350.htm...
3 ROS1通讯编程提高(1) 3 ROS1通讯编程提高3.1 使用VS Code编译ROS13.1.1 VS Code的安装和配置...
大模型未来趋势 大模型是人工智能领域的重要发展趋势之一,未来有着广阔的应用前景和发展空间。以下是大模型未来的趋势和展...
python实战应用讲解-【n... 目录 如何在Python中计算残余的平方和 方法1:使用其Base公式 方法2:使用statsmod...
学习u-boot 需要了解的m... 一、常用函数 1. origin 函数 origin 函数的返回值就是变量来源。使用格式如下...
常用python爬虫库介绍与简... 通用 urllib -网络库(stdlib)。 requests -网络库。 grab – 网络库&...
药品批准文号查询|药融云-中国... 药品批文是国家食品药品监督管理局(NMPA)对药品的审评和批准的证明文件...
【2023-03-22】SRS... 【2023-03-22】SRS推流搭配FFmpeg实现目标检测 说明: 外侧测试使用SRS播放器测...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
初级算法-哈希表 主要记录算法和数据结构学习笔记,新的一年更上一层楼! 初级算法-哈希表...
进程间通信【Linux】 1. 进程间通信 1.1 什么是进程间通信 在 Linux 系统中,进程间通信...
【Docker】P3 Dock... Docker数据卷、宿主机与挂载数据卷的概念及作用挂载宿主机配置数据卷挂载操作示例一个容器挂载多个目...