Raft
6.824 的框架将实现 Raft 分为了四个
Part,分别是领导者选举、日志同步、状态持久化和日志压缩。
领导者选举(Part A)
这部分主要实现 Raft 选举和心跳逻辑(不携带日志的
AppendEntries),主要的任务:
选出「唯一」 的领导者,领导者选出后会持续进行心跳避免其他人发出选举;
旧的领导者宕机或者网络故障无法触达时,选出新的领导者
将 PartA 的实现分为三个部分:
状态转换:角色定义和转换函数
选举逻辑:定义选举 RPC,构造周期性选举的 Loop
心跳逻辑:定义心跳 RPC,当选 Leader 后发送心跳
先根据论文来定义需要用的基本数据结构:
type Raft struct { mu sync.Mutex peers []*labrpc.ClientEnd persister *Persister me int dead int32 role Role currentTerm int votedFor int electionStart time.Time electionTimeout time.Duration }
在工程实践中,currentTerm
一般会建议用指定位数的整形,比如 int32
。
修改函数 GetState
,让测试框架能够正确拿到 Raft
节点的相关状态:
func (rf *Raft) GetState() (int , bool ) { rf.mu.Lock() defer rf.mu.Unlock() return rf.currentTerm, rf.role == Leader }
状态转换
状态是针对某一个节点 Peer 来说的,先定义一下角色类型:
type Role string const ( Follower Role = "Follower" Candidate Role = "Candidate" Leader Role = "Leader" )
为了实现这个状态机,可以定义三个转换函数:becomeCandidate
,becomeLeader
和 becomeFollower
。
func (rf *Raft) becomeFollowerLocked(term int ) { if term < rf.currentTerm { LOG(rf.me, rf.currentTerm, DError, "Can't become Follower, lower term" ) return } LOG(rf.me, rf.currentTerm, DLog, "%s -> Follower, For T%d->T%d" , rf.role, rf.currentTerm, term) if term > rf.currentTerm { rf.votedFor = -1 } rf.role = Follower rf.currentTerm = term }func (rf *Raft) becomeCandidateLocked() { if rf.role == Leader { LOG(rf.me, rf.currentTerm, DError, "Leader can't become Candidate" ) return } LOG(rf.me, rf.currentTerm, DVote, "%s -> Candidate, For T%d->T%d" , rf.role, rf.currentTerm, rf.currentTerm+1 ) rf.role = Candidate rf.currentTerm++ rf.votedFor = rf.me }func (rf *Raft) becomeLeaderLocked() { if rf.role != Candidate { LOG(rf.me, rf.currentTerm, DLeader, "%s, Only candidate can become Leader" , rf.role) return } LOG(rf.me, rf.currentTerm, DLeader, "%s -> Leader, For T%d" , rf.role, rf.currentTerm) rf.role = Leader }
由于涉及对 Raft
的全局状态进行修改,因此需要加锁,且希望在函数外部加锁,所以函数名都带有
Locked 。
选举 Loop
Candidate 要票的整体逻辑:
选举 Loop:负责超时检查,来定时发送选举 RPC;
单轮选举:变为 Candidate 后,向所有除自己的 Peer
之外发起一次要票过程;
单次 RPC:针对每个 Peer 的 RequestVote 的请求响应进行处理;
选举
Loop :基本逻辑是在每次循环时,进行两项检查,「超时检查」和「角色检查」。
超时检查:检查 选举 Timer
是否已经超时,只有超时才会真正发起选举,这里要注意的是在实现上会增加一个随机值,避免同意的超时间隔时间而导致一同发起选举。
角色检查:判断是否为 Leader,如果自己已经是 Leader
不用发起选举。
第一步要实现的是设置随机超时时间和超时检测函数:
const ( electionTimeoutMin time.Duration = 250 * time.Millisecond electionTimeoutMax time.Duration = 400 * time.Millisecond )func (rf *Raft) resetElectionTimerLocked() { rf.electionStart = time.Now() randRange := int64 (electionTimeoutMax - electionTimeoutMin) rf.electionTimeout = electionTimeoutMin + time.Duration(rand.Int63()%randRange) }func (rf *Raft) isElectionTimeoutLocked() bool { return time.Since(rf.electionStart) > rf.electionTimeout }
选举 Loop :当满足两个条件时,将角色装变为
Candidate
,然后异步的发起选举(同步会造成主循环检查延迟):
func (rf *Raft) electionTicker() { for !rf.killed() { rf.mu.Lock() if rf.role != Leader && rf.isElectionTimeoutLocked() { rf.becomeCandidateLocked() go rf.startElection(rf.currentTerm) } rf.mu.Unlock() ms := 50 + (rand.Int63() % 300 ) time.Sleep(time.Duration(ms) * time.Millisecond) } }
所有需要用到 Raft 全局变量的地方都需要加锁,但是不要在加锁的时候发送
RPC,这会造成长时间占用锁,同时增加发生死锁的可能性。
该 Loop 的生命周期和 Raft Peer 相同,在创建 Raft
实例时就在后台开始运行 :
func Make (peers []*labrpc.ClientEnd, me int , persister *Persister, applyCh chan ApplyMsg) *Raft { go rf.electionTicker() return rf }
单轮选举 :一轮选举需要对除自己以外所有的 Peer
都发起一轮 RPC 投票请求,由于需要访问全局变量,所以仍然需要加锁。
func (rf *Raft) startElection(term int ) bool { votes := 0 askVoteFromPeer := func (peer int , args *RequestVoteArgs) { } rf.mu.Lock() defer rf.mu.Unlock() if rf.contextLostLocked(Candidate, term) { return false } for peer := 0 ; peer < len (rf.peers); peer++ { if peer == rf.me { votes++ continue } args := &RequestVoteArgs{ Term: term, CandidateId: rf.me, } go askVoteFromPeer(peer, args) } return true }
上下文检查 :这里的上下文指的是 Term
和
Role
,也就是说在一个任期内,只要角色没有发生变化,就可以推进状态机:
func (rf *Raft) contextLostLocked(role Role, term int ) bool { return !(rf.currentTerm == term && rf.role == role) }
如果上下文已经被更改,那么就应该及时的退出
goroutine,避免对状态机做出错误的改动。
单次 RPC :对每个 Peer 发送 RPC 请求,包括
构造 RPC 参数 、发送 RPC
请求 、等待结果并处理结果 三个部分。
askVoteFromPeer := func (peer int , args *RequestVoteArgs) { reply := &RequestVoteReply{} ok := rf.sendRequestVote(peer, args, reply) rf.mu.Lock() defer rf.mu.Unlock() if !ok { LOG(rf.me, rf.currentTerm, DDebug, "Ask vote from %d, Lost or error" , peer) return } if reply.Term > rf.currentTerm { rf.becomeFollowerLocked(reply.Term) return } if rf.contextLostLocked(Candidate, term) { LOG(rf.me, rf.currentTerm, DVote, "Lost context, abort RequestVoteReply in T%d" , rf.currentTerm) return } if reply.VoteGranted { votes++ } if votes > len (rf.peers)/2 { rf.becomeLeaderLocked() go rf.replicationTicker(term) } }
在进行 RPC 的时候,要随时进行对齐任期的操作。只有 Term
相同,才可以进行状态机的更新,否则就要优先对齐 Term:
如果对方 Term 比自己小:无视请求,通过返回值表达自己的 Term;
如果对方 Term 比自己大:跟上对方的 Term,维持 Follower;
对齐 Term
之后,还需要检查上下文。这里就有一个问题,什么时候需要进行上下文的检查?如果一段逻辑在一把锁的保护下不断的做,那并不需要检查上下文。但这里需要进行一些耗时的操作(如等待
RPC),此时需要把锁临时断开。之后,重新上锁的时候,就需要考虑状态是否满足之前期望的状态。因为锁中间断开了,有些状态可能被其他线程修改了。
所有 Peer 在运行时都会收到要票的 RPC,此时就会执行
RequestVote
回调函数,也就是 Peer 收到 RPC
要票请求时的处理逻辑:
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { rf.mu.Lock() defer rf.mu.Unlock() reply.Term = rf.currentTerm if rf.currentTerm > args.Term { LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject vote, higher term, T%d>T%d" , args.CandidateId, rf.currentTerm, args.Term) reply.VoteGranted = false return } if rf.currentTerm < args.Term { rf.becomeFollowerLocked(args.Term) } if rf.votedFor != -1 && rf.votedFor != args.CandidateId { LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject, Already voted S%d" , args.CandidateId, rf.votedFor) reply.VoteGranted = false return } reply.VoteGranted = true rf.votedFor = args.CandidateId rf.resetElectionTimerLocked() LOG(rf.me, rf.currentTerm, DVote, "-> S%d" , args.CandidateId) }
回调函数实现的第一点是先做对齐 Term,在对齐 Term 的过程中,Peer
有可能重置
votedFor。这样即使本来由于已经投过票了而不能再投票,但提高任期重置后,在新的
Term 里,就又有一票可以投了。
另外一点是,论文提到只有投票给对方后,才能重置选举
Timer 。也就是说,在没有投出票时,是不允许重置选举 Timer 的。
心跳逻辑
「心跳」 和「日志复制」 是相同的
RPC,他们的区别在于是否携带日志。和选举逻辑相对,也分三个层次来实现 RPC
发送方:
心跳 Loop:当选 Leader 需要不断的向其他 Peer 发送心跳包;
单论心跳:对除自己以外所有的 Peer 发送一个心跳 RPC;
单次 RPC:对某个 Peer 发送心跳包,处理 RPC 返回值;
心跳 Loop :由于不需要构造随机超时机制,因此心跳 Loop
会比选举 Loop 简单一些:
func (rf *Raft) replicationTicker(term int ) { for !rf.killed() { ok := rf.startReplication(term) if !ok { return } time.Sleep(replicateInterval) } }
startReplication
有一个返回值,用于检测是否还处于「上下文」的环境中,如果一旦发现 Raft
已经不是这个 Term 的 Leader 了,就需要退出 Loop,不再广播心跳包。
单轮心跳 :Leader 会给除自己外的所有其他 Peer
发送心跳。在发送前要检测“上下文”是否还在,如果不在了,就直接返回
false,告诉外层循环 replicationTicker
可以终止循环了。因此,startReplication
的返回值也可以视作是否成功的发起了一轮心跳。
func (rf *Raft) startReplication(term int ) bool { replicateToPeer := func (peer int , args *AppendEntriesArgs) { } rf.mu.Lock() defer rf.mu.Unlock() if rf.contextLostLocked(Leader, term) { LOG(rf.me, rf.currentTerm, DLeader, "Leader[T%d] -> %s[T%d]" , term, rf.role, rf.currentTerm) return false } for peer := 0 ; peer < len (rf.peers); peer++ { if peer == rf.me { continue } args := &AppendEntriesArgs{ Term: term, LeaderId: rf.me, } go replicateToPeer(peer, args) } return true }
单次
RPC :在不处理日志时,心跳的返回值比较简单,只需要对齐 Term
即可。
replicateToPeer := func (peer int , args *AppendEntriesArgs) { reply := &AppendEntriesReply{} ok := rf.sendAppendEntries(peer, args, reply) rf.mu.Lock() defer rf.mu.Unlock() if !ok { LOG(rf.me, rf.currentTerm, DLog, "-> S%d, Lost or crashed" , peer) return } if reply.Term > rf.currentTerm { rf.becomeFollowerLocked(reply.Term) return } }
心跳接收方在收到心跳包时,只要 Leader 的 Term
不小于自己,就对其进行认可,变为 Follower,并重置选举时钟。
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() reply.Term = rf.currentTerm reply.Success = false if args.Term < rf.currentTerm { LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject log" , args.LeaderId) return } rf.becomeFollowerLocked(args.Term) rf.resetElectionTimerLocked() reply.Success = true }
日志同步(Part B)
在 Part A 的基础上,加上对日志的复制,主要的任务有:
在进行领导选举时,加入日志的比较
领导者收到应用层发来的日志后,要通过心跳同步给所有的 Follower
在收到多数 Follower 同步成功的请求后,Leader 要推进
CommitIndex,并让所有 Peer Apply
先对 AppendEntries
结构体进行完善,按照论文的 Figure 2
来补全2 RPC 涉及到的结构体,根据 ApplyMsg
所需字段,定义
LogEntry
:
type LogEntry struct { Term int CommandValid bool Command interface {} }type AppendEntriesArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []LogEntry }
完善 Raft
中相关的字段,相对应字段的初始化这里就不再赘述。
nextIndex []int matchIndex []int
matchIndex 和 nextIndex 是 Leader 节点用来管理 Follower
日志同步的两个关键字段,他们的作用分别是:
nextIndex:预期下一次要发送给 Follower 的日志条目索引(初始值为
Leader 的最后日志索引 +1)。
matchIndex:已知 Follower 已复制的最高日志条目索引(初始值为
0,表示未匹配)。
一个 Peer 收到 RPC,需要做以下判断来决定是否能够增加日志:
如果 prevLog
不匹配,则返回
Success = false
;
如果 prevLog
匹配,则将参数中 Entries
追加到本地日志,返回 Success = true
;
所谓的日志匹配就是「相同 Index 的地方,Term 相同」 ,index 和
term 能唯一确定一条日志。Raft 保证一个 Term 中最多有(也可能没有)一个
Leader,然后只有该 Leader 能确定日志顺序且同步日志。
为了比较 Follower 和 Leader 的日志一致性,
PrevLogTerm
、PrevLogIndex
,主要用于检测在
Leader 的视图里,Follower 的日志是否正确。这两个字段分别代表:
Leader 发送日志条目时,Follower
前一个日志条目的索引(用于定位插入位置)。
Leader 发送日志条目时,Follower
前一个日志条目的任期号(用于验证一致性)。
之前的心跳只是负责压制其他的 Peer 来发起选举,因此不用给 Leader 返回
reply,但日志复制就需要返回 reply 了。
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() LOG(rf.me, rf.currentTerm, DDebug, "<- S%d, Receive log, Prev=[%d]T%d, Len()=%d" , args.LeaderId, args.PrevLogIndex, args.PrevLogTerm, len (args.Entries)) reply.Term = rf.currentTerm reply.Success = false if args.Term < rf.currentTerm { LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Higher term, T%d<T%d" , args.LeaderId, args.Term, rf.currentTerm) return } if args.Term >= rf.currentTerm { rf.becomeFollowerLocked(args.Term) } if args.PrevLogIndex >= len (rf.log) { LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Follower log too short, Len:%d <= Prev:%d" , args.LeaderId, len (rf.log), args.PrevLogIndex) return } if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm { LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Prev log not match, [%d]: T%d != T%d" , args.LeaderId, args.PrevLogIndex, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm) return } conflictLogIndex := args.PrevLogIndex + 1 conflictEntryIdx := 0 for ; conflictLogIndex < len (rf.log) && conflictEntryIdx < len (args.Entries); conflictLogIndex++, conflictEntryIdx++ { if rf.log[conflictLogIndex].Term != args.Entries[conflictEntryIdx].Term { break } } if conflictEntryIdx < len (args.Entries) { rf.log = append (rf.log[:conflictLogIndex], args.Entries[conflictEntryIdx:]...) LOG(rf.me, rf.currentTerm, DLog2, "Follower append logs: (%d, %d]" , conflictLogIndex-1 , args.PrevLogIndex+len (args.Entries)) } reply.Success = true rf.resetElectionTimerLocked() }
对于日志复制 RPC 的发送方来说,需要增加两部分逻辑:
每个 RPC 发送前的参数构造;
每个 RPC 返回值处理:
如果复制成功,看看是否能够更新 Leader 的
commitIndex
如果复制失败,需要将匹配点回退,继续试探
匹配点回退使用的是快速回溯算法,核心思想是快速回退到冲突任期(Term)的开始位置 ,从而高效解决
Leader 和 Follower 之间的日志不一致问题。它的设计目标是 减少不必要的 RPC
重试次数,直接跳过整个冲突任期的所有条目,而非逐条回退(线性回溯)。
func (rf *Raft) startReplication(term int ) bool { replicateToPeer := func (peer int , args *AppendEntriesArgs) { reply := &AppendEntriesReply{} ok := rf.sendAppendEntries(peer, args, reply) rf.mu.Lock() defer rf.mu.Unlock() if !ok { LOG(rf.me, rf.currentTerm, DLog, "-> S%d, Lost or crashed" , peer) return } if reply.Term > rf.currentTerm { rf.becomeFollowerLocked(reply.Term) return } if !reply.Success { idx := rf.nextIndex[peer] - 1 term := rf.log[idx].Term for idx > 0 && rf.log[idx].Term == term { idx-- } rf.nextIndex[peer] = idx + 1 LOG(rf.me, rf.currentTerm, DLog, "Log not matched in %d, Update next=%d" , args.PrevLogIndex, rf.nextIndex[peer]) return } rf.matchIndex[peer] = args.PrevLogIndex + len (args.Entries) rf.nextIndex[peer] = rf.matchIndex[peer] + 1 } rf.mu.Lock() defer rf.mu.Unlock() if rf.contextLostLocked(Leader, term) { LOG(rf.me, rf.currentTerm, DLog, "Lost Leader[%d] to %s[T%d]" , term, rf.role, rf.currentTerm) return false } for peer := 0 ; peer < len (rf.peers); peer++ { if peer == rf.me { rf.matchIndex[peer] = len (rf.log) - 1 rf.nextIndex[peer] = len (rf.log) continue } prevIdx := rf.nextIndex[peer] - 1 prevTerm := rf.log[prevIdx].Term args := &AppendEntriesArgs{ Term: rf.currentTerm, LeaderId: rf.me, PrevLogIndex: prevIdx, PrevLogTerm: prevTerm, Entries: append ([]LogEntry(nil ), rf.log[prevIdx+1 :]...), LeaderCommit: rf.commitIndex, } LOG(rf.me, rf.currentTerm, DDebug, "-> S%d, Send log, Prev=[%d]T%d, Len()=%d" , peer, args.PrevLogIndex, args.PrevLogTerm, len (args.Entries)) go replicateToPeer(peer, args) } return true }
这部分的最终目的,就是要更新 matchIndex。进而依据所有 Peer 的
matchIndex 来算 commitIndex 。Leader 有了 commitIndex
之后,再将其下发给各个 Follower,指导其各自更新本地 commitIndex 进而
apply。
除此之外,在当选 Leader 时,就需要更新对其他 Peer 的视图。
func (rf *Raft) becomeLeaderLocked() { if rf.role != Candidate { LOG(rf.me, rf.currentTerm, DError, "Only Candidate can become Leader" ) return } LOG(rf.me, rf.currentTerm, DLeader, "Become Leader in T%d" , rf.currentTerm) rf.role = Leader for peer := 0 ; peer < len (rf.peers); peer++ { rf.nextIndex[peer] = len (rf.log) rf.matchIndex[peer] = 0 } }
在有了日志的概念后,在进行 Leader
选举的时候,就需要进行日志比较了。在进行选举的时候,确保只有具有比大多数
Peer 更新日志的候选人才能当选 Leader。
那么因此会引申出一个问题,对于两个 Peer
来说,谁的日志才算是最新的。论文的描述是这样的:
Term 高的 Peer 日志越新;
Term 相同,Index 大的 Peer 日志越新;
func (rf *Raft) isMoreUpDateLocked(candidateIndex, candidateTerm int ) bool { l := len (rf.log) lastIndex, lastTerm := l-1 , rf.log[l-1 ].Term LOG(rf.me, rf.currentTerm, DVote, "Compare last log, Me: [%d]T%d, Candidate: [%d]T%d" , lastIndex, lastTerm, candidateIndex, candidateTerm) if lastTerm != candidateTerm { return lastTerm > candidateTerm } return lastIndex > candidateIndex }
在投票 RPC 中最后一条日志的 Term 和 Index 信息,用于日志比较:
type RequestVoteArgs struct { Term int CandidateId int LastLogIndex int LastLogTerm int }
在构造 RPC 请求的参数时,加上这两个字段:
args := &RequestVoteArgs{ Term: rf.currentTerm, CandidateId: rf.me, LastLogIndex: l-1 , LastLogTerm: rf.log[l-1 ].Term, }
接收方在进行对齐 Term,如果没有投过票,通过日志比较来决定是否能给该
Peer 投票。
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { if rf.votedFor != -1 && rf.votedFor != args.CandidateId { LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject Voted, Already voted to S%d" , args.CandidateId, rf.votedFor) return } if rf.isMoreUpToDateLocked(args.LastLogIndex,args.LastLogTerm) { LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject Vote, S%d's log less up-to-date" , args.CandidateId) return } }
下面实现关于日志 Apply 的逻辑,也就是 Leader 提交日志的部分。Apply
只有在 commitIndex
增大的时候才会触发,因此可以利用一下
golang 的语法 sync.Cond
,使用唤醒机制,在
commitIndex
增大后唤醒 applyTicker
。
根据 Figure 2 中补充两个字段:
commitIndex:全局日志提交进度
lastApplied:本地 Peer 日志提交进度
applyCond:用于唤醒 applyTicker
applyCh:将 Apply 工作流构造的 Msg 传递给应用层
ApplyTicker 工作流的具体步骤:
构造所有待 apply 的 ApplyMsg
遍历这些 msgs,进行 apply
更新 lastApplied
func (rf *Raft) applyTicker() { for !rf.killed() { rf.mu.Lock() rf.applyCond.Wait() entries := make ([]LogEntry, 0 ) for i := rf.lastApplied + 1 ; i <= rf.commitIndex; i++ { entries = append (entries, rf.log[i]) } rf.mu.Unlock() for i, entry := range entries { rf.applyCh <- ApplyMsg{ CommandValid: entry.CommandValid, Command: entry.Command, CommandIndex: rf.lastApplied + 1 + i, } } rf.mu.Lock() LOG(rf.me, rf.currentTerm, DApply, "Apply log for [%d, %d]" , rf.lastApplied+1 , rf.lastApplied+len (entries)) rf.lastApplied += len (entries) rf.mu.Unlock() } }
在 Leader 给其他 Peer AppendEntries
成功后,会更新
rf.matchIndex
。
replicateToPeer := func (peer int , args *AppendEntriesArgs) { rf.matchIndex[peer] = args.PrevLogIndex + len (args.Entries) rf.nextIndex[peer] = rf.matchIndex[peer] + 1 majorityMatched := rf.getMajorityIndexLocked() if majorityMatched > rf.commitIndex { LOG(rf.me, rf.currentTerm, DApply, "Leader update the commit index %d->%d" , rf.commitIndex, majorityMatched) rf.commitIndex = majorityMatched rf.applyCond.Signal() } }
在每次更新 matchIndex
后,依据此全局匹配点视图,可以算出多数 Peer 的匹配点,进而更新 Leader 的
CommitIndex
。这里使用排序后找中位数的方法。由于排序会改变原数组,因此要把
matchIndex
复制一份再进行排序。
func (rf *Raft) getMajorityIndexLocked() int { tmpIndexes := make ([]int , len (rf.matchIndex)) copy (tmpIndexes, rf.matchIndex) sort.Ints(tmpIndexes) majorityIdx := (len (tmpIndexes) - 1 ) / 2 LOG(rf.me, rf.currentTerm, DDebug, "Match index after sort: %v, majority[%d]=%d" , tmpIndexes, majorityIdx, tmpIndexes[majorityIdx]) return tmpIndexes[majorityIdx] }
如果 commitIndex
更新,则唤醒 apply 工作流,可以将 msg
更新到本地日志了。Leader 通过下一次的 AppendEntries
的 RPC
参数将 commitIndex
发送给每个 Follower,因此需要再
AppendEntriesArgs
增加这个参数。
每个 Follower 通过 AppendEntries 的回调函数收到 Leader 发来的
LeaderCommit,来更新本地的 CommitIndex,进而驱动 Apply
工作流开始干活。
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { if args.LeaderCommit > rf.commitIndex { LOG(rf.me, rf.currentTerm, DApply, "Follower update the commit index %d->%d" , rf.commitIndex, args.LeaderCommit) rf.commitIndex = args.LeaderCommit if rf.commitIndex >= len (rf.log) { rf.commitIndex = len (rf.log) - 1 } rf.applyCond.Signal() } }
状态持久化(Part C)
目前能够做到在「不宕机」下的领导者选举。但是在某个 Peer
异常重启后,是不能正常重新加入集群的。为此需要将 Raft
的关键信息定时持久化,重启后加载,以保证重新加入集群。
主要是两个序列化和反序列化函数:
func (rf *Raft) persistString() string { return fmt.Sprintf("T%d, VotedFor: %d, Log: [0: %d)" , rf.currentTerm, rf.votedFor, len (rf.log)) }func (rf *Raft) persistLocked() { w := new (bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(rf.currentTerm) e.Encode(rf.votedFor) e.Encode(rf.log) raftstate := w.Bytes() rf.persister.Save(raftstate, nil ) }func (rf *Raft) readPersist(data []byte ) { if data == nil || len (data) < 1 { return } var currentTerm int var votedFor int var log []LogEntry r := bytes.NewBuffer(data) d := labgob.NewDecoder(r) if err := d.Decode(¤tTerm); err != nil { LOG(rf.me, rf.currentTerm, DPersist, "Read currentTerm error: %v" , err) return } rf.currentTerm = currentTerm if err := d.Decode(&votedFor); err != nil { LOG(rf.me, rf.currentTerm, DPersist, "Read votedFor error: %v" , err) return } rf.votedFor = votedFor if err := d.Decode(&log); err != nil { LOG(rf.me, rf.currentTerm, DPersist, "Read log error: %v" , err) return } rf.log = log LOG(rf.me, rf.currentTerm, DPersist, "Read Persist %v" , rf.persistString()) }
主要需要持久化的有三个字段:currentTerm
,votedFor
和 log
,为什么只需要持久化这三个字段:
currentTerm
:重启后一定要知道自己之前任期到了哪里,因为任期是状态机中正确行为的基础;
votedFor
:如果在某个任期已经投过票了,重启之后不能投票,否则会多一票;
log
:日志作为数据,理所当然要持久化;
另外 persist
函数需要访问全局状态,因此需要在临界区调用,不然会出现竞争。在所有修改全局状态的地方,修改完成之后都需要做一下持久化。
日志回溯优化
之前在 Leader
发送同步日志时,采用的是回退的方式,而每次回退都需要下一个 RPC
才能发送给 Follower,也就是说每次回退都要经过一个
replicateInterval
。如果回退算法不够好,某些情况下,冲突探测时间会特别长。「Leader
只基于自己的日志进行回退的情况下,一次不管是回退一个 index 还是回退一个
term 效果都不好」 。因此,可以考虑让 Follower 提供一些信息,告诉
Leader 自己的日志目前到哪里了。
给 AppendEntriesReply
增加两个额外的字段,让 Follower
携带一些日志冲突的信息。
type AppendEntriesReply struct { Term int Success bool ConflictIndex int ConflictTerm int }
Follower 算法大致流程:
如果 Follower 日志过短,那么 ConflictTerm
置空,ConflictIndex = len(rf.log)
;
否则,将 ConflictTerm
设置为 Follower 在
Leader.PrevLogIndex
处日志的
Term;ConflictIndex
为 ConflictTerm
的第一条日志;
第一条做法的目的在于,如果 Follower 日志过短,可以提示 Leader
迅速回退到 Follower 日志的末尾。第二条做法的目的在于,如果 Follower 存在
Leader.PrevLog
,但不匹配,则将对应的 Term
的日志全部跳过。
if args.PrevLogIndex >= len (rf.log) { reply.ConflictIndex = len (rf.log) reply.ConflictTerm = InvalidTerm return }if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm { reply.ConflictTerm = rf.log[args.PrevLogIndex].Term reply.ConflictIndex = rf.firstLogFor(reply.ConflictTerm) return }
Leader 端使用上面两个新增字段的算法如下:
如果 ConflictTerm
为空,说明 Follower 日志太短,直接将
nextIndex
赋值为 ConflictIndex 迅速回退到 Follower
日志末尾。
否则,以 Leader 日志为准,跳过 ConflictTerm
的所有日志;如果发现 Leader 日志中不存在 ConflictTerm
的任何日志,则以 Follower 为准跳过 ConflictTerm
,即使用
ConflictIndex
。
if !reply.Success { prevNext := rf.nextIndex[peer] if reply.ConflictTerm == InvalidTerm { rf.nextIndex[peer] = reply.ConflictIndex } else { firstTermIndex := rf.firstLogFor(reply.ConflictTerm) if firstTermIndex != InvalidIndex { rf.nextIndex[peer] = firstTermIndex } else { rf.nextIndex[peer] = reply.ConflictIndex } } rf.nextIndex[peer] = MinInt(prevNext, rf.nextIndex[peer]) return }
空 Term 和 空 Index 可以用一个特殊值来表示。
封装一个函数用于在日志中找到指定 term 的第一条日志。
func (rf *Raft) firstLogFor(term int ) int { for i, entry := range rf.log { if entry.Term == term { return i } else if entry.Term > term { break } } return InvalidIndex }
为了避免 Reply 乱序到达,导致探测点乱序前进和回退,需要保证
nextIndex
是单调递减的。
if !reply.Success { prevIndex := rf.nextIndex[peer] if rf.nextIndex[peer] > prevIndex { rf.nextIndex[peer] = prevIndex } return }
日志压缩(Part D)
对于一个长时间运行的 Raft
系统,如果持续收到日志回会遇到「空间不足」 ,「启动过慢」 等问题。日志无限的追加下去,本地硬盘空间可能存不下。重启时需要重放所有日志,如果日志过长,重放过程将会持续很久不能正常对外提供服务。
最容易想到的方法就是定期对日志做快照 。针对某个日志
entry 做了快照之后,该 entry
及之前的日志都可以被截断 。为什么需要这么做呢?相比于日志,快照的存储更加紧凑。日志记录的是对状态机修改的事件,比如
update k1 = v1
,而快照通常记录的是
k1: v1
,一条数据会在日志中出现多次,但是在快照中只会保存其最后一次的状态。
截断的同时也会带来新的问题,如果一个 term
较低的节点加入了集群,Leader 需要发送过往的日志给该
Follower,发现过往日志已经被截断了。这时候就需要引入一个新的 RPC
InstallSnapshot
,将 Leader 的 Snapshot 全量同步给
Follower,再做之后的增量同步。
下面只贴出关键代码,其中还有对 Part A、B、C 代码的修改,完整代码详见
Github 仓库
将对 Log 的结构全部都抽离出来,之前只用 []LogEntry
来表示日志,现在构造一个新的 RaftLog
来表示维护的日志。
type RaftLog struct { snapLastIdx int snapLastTerm int snapshot []byte tailLog []LogEntry }
RaftLog
主要维护三部分的信息:
截断后压缩的日志 snapshot
;
后续的剩余日志 tailLog
;
两者的分界线;
为了避免边界判断,在 tailLog
中添加一个 dummy 日志,将
tailLog
中下标为 0 的日志留空,但是给它的 Term
赋值 snapLastTerm
。同时添加一些下标转换的工具函数,因为
tailLog
中日志的 index 不是真正日志的
Index
。
func NewLog (snapLastIdx, snapLastTerm int , snapshot []byte , entries []LogEntry) *RaftLog { rl := &RaftLog{ snapLastIdx: snapLastIdx, snapLastTerm: snapLastTerm, snapshot: snapshot, } rl.tailLog = make ([]LogEntry, 0 , 1 +len (entries)) rl.tailLog = append (rl.tailLog, LogEntry{ Term: snapLastTerm, }) rl.tailLog = append (rl.tailLog, entries...) return rl }func (rl *RaftLog) idx(logicIdx int ) int { if logicIdx < rl.snapLastIdx || logicIdx >= rl.size() { panic (fmt.Sprintf("%d is out of [%d, %d]" , logicIdx, rl.snapLastIdx, rl.size()-1 )) } return logicIdx - rl.snapLastIdx }func (rl *RaftLog) size() int { return rl.snapLastIdx + len (rl.tailLog) }func (rl *RaftLog) at(logicIdx int ) LogEntry { return rl.tailLog[rl.idx(logicIdx)] }func (rl *RaftLog) append (e LogEntry) { rl.tailLog = append (rl.tailLog, e) }func (rl *RaftLog) last() (int , int ) { i := len (rl.tailLog) - 1 return rl.snapLastIdx + i, rl.tailLog[i].Term }func (rl *RaftLog) appendFrom(logicPrevIndex int , entries []LogEntry) { rl.tailLog = append (rl.tailLog[:rl.idx(logicPrevIndex)+1 ], entries...) }func (rl *RaftLog) tail(startIdx int ) []LogEntry { if startIdx >= rl.size() { return nil } return rl.tailLog[rl.idx(startIdx):] }
除了构造函数,宕机重启,也会通过读取各个字段,进行反序列化构造
RaftLog
。
func (rl *RaftLog) readPersist(d *labgob.LabDecoder) error { var lastIdx int if err := d.Decode(&lastIdx); err != nil { return fmt.Errorf("decode last include index failed" ) } rl.snapLastIdx = lastIdx var lastTerm int if err := d.Decode(&lastTerm); err != nil { return fmt.Errorf("decode last include term failed" ) } rl.snapLastTerm = lastTerm var log []LogEntry if err := d.Decode(&log); err != nil { return fmt.Errorf("decode tailLog failed" ) } rl.tailLog = log return nil }func (rl *RaftLog) persist(e *labgob.LabEncoder) { e.Encode(rl.snapLastIdx) e.Encode(rl.snapLastTerm) e.Encode(rl.tailLog) }
应用层发出请求,要求 Raft 层在 index 处做一个快照,同时 index
之前的日志就可以释放掉了。在进行日志截断时,注意要新建一个数组,而不是使用下标切片运算,只有新建数组,才会对原数组释放引用,GC
才能够把原来的空间回收掉。
func (rl *RaftLog) doSnapshot(index int , snapshot []byte ) { idx := rl.idx(index) rl.snapLastTerm = rl.tailLog[idx].Term rl.snapLastIdx = index rl.snapshot = snapshot newLog := make ([]LogEntry, 0 , rl.size()-rl.snapLastIdx) newLog = append (newLog, LogEntry{ Term: rl.snapLastTerm, }) newLog = append (newLog, rl.tailLog[idx+1 :]...) rl.tailLog = newLog }
整个 InstallSnapshot
RPC 的流程,这部分的代码详见
raft_compaction.go
Leader 应用层调用 raft.Snapshot(index, snapshot)
函数,主要负责:
保存 snapshot
截断日志
持久化
Leader 在需要时使用该 snapshot 构造参数发送 RPC 给 Follower
Follower 收到 RPC 后替换本地日志,并将其持久化
Follower 通过 ApplyMsg
讲 snapshot 传递给 Follower
应用层
后续讨论一下注意点:
Follower 收到 snapshot apply 的时候完全
apply?那应用层如何进行去重?Snapshot 中的 kv 带 index 吗?
应用层 Apply snapshot 时,是覆盖式(全量式,因为 Snapshot 下标肯定从
0 开始) Apply;在 Apply log 时,是增量式的 Apply。两者不同。
因此在 Apply snapshot
时,并不需要去重(因为不是增量式的),直接替换掉当前状态机即可。
应用层调用 snapshot(index, snapshot)意味着 什么?
对应用层意味着:
应用已经做好了一个快照,但编解码方式只有应用层自己知道,Raft
层不感知。
应用层自己也会保存该快照,之后宕机重启后会先加载该快照。此时(宕机重启后)
Raft 层也要记得更新自己的 commitIndex 和 lastApplied。
对 Raft 层意味着:
应用层告诉 Raft 层,你可以把 index 及以前的日志给释放掉了。
Raft 层要保存下 Snapshot,万一其变为 Leader 之后需要给 Follower
发。
保存分为在内存中保存和持久化到外存。
需要对原流程修改的地方?
Leader 发送 entries 的时候,要先检查要发送的 PrevLogEntry
还在不在,如果不在了,需要先发 snapshot。
宕机重启后,lastApplied 怎么初始化?
如果有 snapshot 存在:则需要初始化为 snapshot 的
lastIncludedIndex。因为应用层肯定也是从自己 snapshot 中来恢复的。
在 Make 阶段同步的再 apply 一个 snapshot 到 appplyCh 中有可能会直接
block 住。
即使不阻塞,有时候也会遇到 snapshot decode error 报错。
如果没有 snapshot 存在:那就初始化为 0 。
如果初始化为 snapLastIndex,那前面的需要 apply 吗?
不需要,因为应用层自己会从自己保存的 snapshot 中恢复。