Raft基于6.824框架

Raft

6.824 的框架将实现 Raft 分为了四个 Part,分别是领导者选举、日志同步、状态持久化和日志压缩。

领导者选举(Part A)

这部分主要实现 Raft 选举和心跳逻辑(不携带日志的 AppendEntries),主要的任务:

  • 选出「唯一」的领导者,领导者选出后会持续进行心跳避免其他人发出选举;
  • 旧的领导者宕机或者网络故障无法触达时,选出新的领导者

将 PartA 的实现分为三个部分:

  • 状态转换:角色定义和转换函数
  • 选举逻辑:定义选举 RPC,构造周期性选举的 Loop
  • 心跳逻辑:定义心跳 RPC,当选 Leader 后发送心跳

先根据论文来定义需要用的基本数据结构:

// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (PartA, PartB, PartC).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
role Role
currentTerm int
votedFor int // 在任期内是否投过票,没有投过则为 -1

electionStart time.Time // 每次选举计算时间点
electionTimeout time.Duration // 超时随机间隔
}

在工程实践中,currentTerm 一般会建议用指定位数的整形,比如 int32

修改函数 GetState,让测试框架能够正确拿到 Raft 节点的相关状态:

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
// Your code here (PartA).
rf.mu.Lock()
defer rf.mu.Unlock()
return rf.currentTerm, rf.role == Leader
}

状态转换

状态是针对某一个节点 Peer 来说的,先定义一下角色类型:

type Role string

const (
Follower Role = "Follower"
Candidate Role = "Candidate"
Leader Role = "Leader"
)

为了实现这个状态机,可以定义三个转换函数:becomeCandidatebecomeLeaderbecomeFollower

// 角色转换状态机
// become a follower in `term`, term could not be decreased
func (rf *Raft) becomeFollowerLocked(term int) {
if term < rf.currentTerm {
LOG(rf.me, rf.currentTerm, DError, "Can't become Follower, lower term")
return
}

LOG(rf.me, rf.currentTerm, DLog, "%s -> Follower, For T%d->T%d",
rf.role, rf.currentTerm, term)

if term > rf.currentTerm {
rf.votedFor = -1
}
rf.role = Follower
rf.currentTerm = term
}

func (rf *Raft) becomeCandidateLocked() {
if rf.role == Leader {
LOG(rf.me, rf.currentTerm, DError, "Leader can't become Candidate")
return
}

LOG(rf.me, rf.currentTerm, DVote, "%s -> Candidate, For T%d->T%d",
rf.role, rf.currentTerm, rf.currentTerm+1)
rf.role = Candidate
rf.currentTerm++
rf.votedFor = rf.me
}

func (rf *Raft) becomeLeaderLocked() {
if rf.role != Candidate {
LOG(rf.me, rf.currentTerm, DLeader,
"%s, Only candidate can become Leader", rf.role)
return
}

LOG(rf.me, rf.currentTerm, DLeader, "%s -> Leader, For T%d",
rf.role, rf.currentTerm)
rf.role = Leader
}

由于涉及对 Raft 的全局状态进行修改,因此需要加锁,且希望在函数外部加锁,所以函数名都带有 Locked

选举 Loop

Candidate 要票的整体逻辑:

  • 选举 Loop:负责超时检查,来定时发送选举 RPC;
  • 单轮选举:变为 Candidate 后,向所有除自己的 Peer 之外发起一次要票过程;
  • 单次 RPC:针对每个 Peer 的 RequestVote 的请求响应进行处理;

选举 Loop:基本逻辑是在每次循环时,进行两项检查,「超时检查」和「角色检查」。

  • 超时检查:检查 选举 Timer 是否已经超时,只有超时才会真正发起选举,这里要注意的是在实现上会增加一个随机值,避免同意的超时间隔时间而导致一同发起选举。
  • 角色检查:判断是否为 Leader,如果自己已经是 Leader 不用发起选举。

第一步要实现的是设置随机超时时间和超时检测函数:

const (
electionTimeoutMin time.Duration = 250 * time.Millisecond
electionTimeoutMax time.Duration = 400 * time.Millisecond
)

// 重制选举时钟
func (rf *Raft) resetElectionTimerLocked() {
rf.electionStart = time.Now()
randRange := int64(electionTimeoutMax - electionTimeoutMin)
rf.electionTimeout = electionTimeoutMin + time.Duration(rand.Int63()%randRange)
}

func (rf *Raft) isElectionTimeoutLocked() bool {
return time.Since(rf.electionStart) > rf.electionTimeout
}

选举 Loop:当满足两个条件时,将角色装变为 Candidate,然后异步的发起选举(同步会造成主循环检查延迟):

func (rf *Raft) electionTicker() {
for !rf.killed() {
// Your code here (PartA)
// Check if a leader election should be started.
rf.mu.Lock()
if rf.role != Leader && rf.isElectionTimeoutLocked() {
rf.becomeCandidateLocked()
go rf.startElection(rf.currentTerm)
}
rf.mu.Unlock()

// pause for a random amount of time between 50 and 350
// milliseconds.
ms := 50 + (rand.Int63() % 300)
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}

所有需要用到 Raft 全局变量的地方都需要加锁,但是不要在加锁的时候发送 RPC,这会造成长时间占用锁,同时增加发生死锁的可能性。

该 Loop 的生命周期和 Raft Peer 相同,在创建 Raft 实例时就在后台开始运行

func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
// ...
// start ticker goroutine to start elections
go rf.electionTicker()

return rf
}

单轮选举:一轮选举需要对除自己以外所有的 Peer 都发起一轮 RPC 投票请求,由于需要访问全局变量,所以仍然需要加锁。

func (rf *Raft) startElection(term int) bool {
votes := 0
askVoteFromPeer := func(peer int, args *RequestVoteArgs) {
// send rpc to `peer` and handle the response
}

rf.mu.Lock()
defer rf.mu.Unlock()

// every time locked
if rf.contextLostLocked(Candidate, term) {
return false
}

for peer := 0; peer < len(rf.peers); peer++ {
if peer == rf.me {
votes++
continue
}

args := &RequestVoteArgs{
Term: term,
CandidateId: rf.me,
}
go askVoteFromPeer(peer, args)
}

return true
}

上下文检查:这里的上下文指的是 TermRole,也就是说在一个任期内,只要角色没有发生变化,就可以推进状态机:

func (rf *Raft) contextLostLocked(role Role, term int) bool {
return !(rf.currentTerm == term && rf.role == role)
}

如果上下文已经被更改,那么就应该及时的退出 goroutine,避免对状态机做出错误的改动。

单次 RPC:对每个 Peer 发送 RPC 请求,包括 构造 RPC 参数发送 RPC 请求等待结果并处理结果 三个部分。

askVoteFromPeer := func(peer int, args *RequestVoteArgs) {
// send RPC
reply := &RequestVoteReply{}
ok := rf.sendRequestVote(peer, args, reply)

// handle the response
rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
LOG(rf.me, rf.currentTerm, DDebug, "Ask vote from %d, Lost or error", peer)
return
}

// align the term
if reply.Term > rf.currentTerm {
rf.becomeFollowerLocked(reply.Term)
return
}

// check the context
if rf.contextLostLocked(Candidate, term) {
LOG(rf.me, rf.currentTerm, DVote, "Lost context, abort RequestVoteReply in T%d", rf.currentTerm)
return
}

// count votes
if reply.VoteGranted {
votes++
}
if votes > len(rf.peers)/2 {
rf.becomeLeaderLocked()
go rf.replicationTicker(term)
}
}

在进行 RPC 的时候,要随时进行对齐任期的操作。只有 Term 相同,才可以进行状态机的更新,否则就要优先对齐 Term:

  • 如果对方 Term 比自己小:无视请求,通过返回值表达自己的 Term;
  • 如果对方 Term 比自己大:跟上对方的 Term,维持 Follower;

对齐 Term 之后,还需要检查上下文。这里就有一个问题,什么时候需要进行上下文的检查?如果一段逻辑在一把锁的保护下不断的做,那并不需要检查上下文。但这里需要进行一些耗时的操作(如等待 RPC),此时需要把锁临时断开。之后,重新上锁的时候,就需要考虑状态是否满足之前期望的状态。因为锁中间断开了,有些状态可能被其他线程修改了。

所有 Peer 在运行时都会收到要票的 RPC,此时就会执行 RequestVote 回调函数,也就是 Peer 收到 RPC 要票请求时的处理逻辑:

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (PartA, PartB).
rf.mu.Lock()
defer rf.mu.Unlock()

// align the term
reply.Term = rf.currentTerm
if rf.currentTerm > args.Term {
LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject vote, higher term, T%d>T%d", args.CandidateId, rf.currentTerm, args.Term)
reply.VoteGranted = false
return
}
if rf.currentTerm < args.Term {
rf.becomeFollowerLocked(args.Term)
}

// check the votedFor
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject, Already voted S%d", args.CandidateId, rf.votedFor)
reply.VoteGranted = false
return
}

reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.resetElectionTimerLocked()
LOG(rf.me, rf.currentTerm, DVote, "-> S%d", args.CandidateId)
}

回调函数实现的第一点是先做对齐 Term,在对齐 Term 的过程中,Peer 有可能重置 votedFor。这样即使本来由于已经投过票了而不能再投票,但提高任期重置后,在新的 Term 里,就又有一票可以投了。

另外一点是,论文提到只有投票给对方后,才能重置选举 Timer。也就是说,在没有投出票时,是不允许重置选举 Timer 的。

心跳逻辑

「心跳」「日志复制」是相同的 RPC,他们的区别在于是否携带日志。和选举逻辑相对,也分三个层次来实现 RPC 发送方:

  • 心跳 Loop:当选 Leader 需要不断的向其他 Peer 发送心跳包;
  • 单论心跳:对除自己以外所有的 Peer 发送一个心跳 RPC;
  • 单次 RPC:对某个 Peer 发送心跳包,处理 RPC 返回值;

心跳 Loop:由于不需要构造随机超时机制,因此心跳 Loop 会比选举 Loop 简单一些:

func (rf *Raft) replicationTicker(term int) {
for !rf.killed() {
ok := rf.startReplication(term)
if !ok {
return
}

time.Sleep(replicateInterval)
}
}

startReplication 有一个返回值,用于检测是否还处于「上下文」的环境中,如果一旦发现 Raft 已经不是这个 Term 的 Leader 了,就需要退出 Loop,不再广播心跳包。

单轮心跳:Leader 会给除自己外的所有其他 Peer 发送心跳。在发送前要检测“上下文”是否还在,如果不在了,就直接返回 false,告诉外层循环 replicationTicker 可以终止循环了。因此,startReplication 的返回值也可以视作是否成功的发起了一轮心跳。

func (rf *Raft) startReplication(term int) bool {
replicateToPeer := func(peer int, args *AppendEntriesArgs) {
// send heartbeat RPC and handle the reply
}

rf.mu.Lock()
defer rf.mu.Unlock()
if rf.contextLostLocked(Leader, term) {
LOG(rf.me, rf.currentTerm, DLeader, "Leader[T%d] -> %s[T%d]", term, rf.role, rf.currentTerm)
return false
}

for peer := 0; peer < len(rf.peers); peer++ {
if peer == rf.me {
continue
}

args := &AppendEntriesArgs{
Term: term,
LeaderId: rf.me,
}

go replicateToPeer(peer, args)
}

return true
}

单次 RPC:在不处理日志时,心跳的返回值比较简单,只需要对齐 Term 即可。

replicateToPeer := func(peer int, args *AppendEntriesArgs) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(peer, args, reply)

rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
LOG(rf.me, rf.currentTerm, DLog, "-> S%d, Lost or crashed", peer)
return
}
// align the term
if reply.Term > rf.currentTerm {
rf.becomeFollowerLocked(reply.Term)
return
}
}

心跳接收方在收到心跳包时,只要 Leader 的 Term 不小于自己,就对其进行认可,变为 Follower,并重置选举时钟。

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm
reply.Success = false
// align the term
if args.Term < rf.currentTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject log", args.LeaderId)
return
}
rf.becomeFollowerLocked(args.Term)


// reset the timer
rf.resetElectionTimerLocked()
reply.Success = true
}

日志同步(Part B)

在 Part A 的基础上,加上对日志的复制,主要的任务有:

  • 在进行领导选举时,加入日志的比较
  • 领导者收到应用层发来的日志后,要通过心跳同步给所有的 Follower
  • 在收到多数 Follower 同步成功的请求后,Leader 要推进 CommitIndex,并让所有 Peer Apply

先对 AppendEntries 结构体进行完善,按照论文的 Figure 2 来补全2 RPC 涉及到的结构体,根据 ApplyMsg 所需字段,定义 LogEntry

type LogEntry struct {
Term int // Log 的 Term
CommandValid bool // Command 是否需要 Apply
Command interface{} // 存储到状态机里的指令
}

type AppendEntriesArgs struct {
Term int
LeaderId int

// 匹配点的试探
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
}

完善 Raft 中相关的字段,相对应字段的初始化这里就不再赘述。

// Leader 才会使用,每个 peer 的视图
nextIndex []int
matchIndex []int

matchIndex 和 nextIndex 是 Leader 节点用来管理 Follower 日志同步的两个关键字段,他们的作用分别是:

  • nextIndex:预期下一次要发送给 Follower 的日志条目索引(初始值为 Leader 的最后日志索引 +1)。
  • matchIndex:已知 Follower 已复制的最高日志条目索引(初始值为 0,表示未匹配)。

一个 Peer 收到 RPC,需要做以下判断来决定是否能够增加日志:

  • 如果 prevLog 不匹配,则返回 Success = false
  • 如果 prevLog 匹配,则将参数中 Entries 追加到本地日志,返回 Success = true

所谓的日志匹配就是「相同 Index 的地方,Term 相同」,index 和 term 能唯一确定一条日志。Raft 保证一个 Term 中最多有(也可能没有)一个 Leader,然后只有该 Leader 能确定日志顺序且同步日志。

为了比较 Follower 和 Leader 的日志一致性, PrevLogTermPrevLogIndex,主要用于检测在 Leader 的视图里,Follower 的日志是否正确。这两个字段分别代表:

  • Leader 发送日志条目时,Follower 前一个日志条目的索引(用于定位插入位置)。
  • Leader 发送日志条目时,Follower 前一个日志条目的任期号(用于验证一致性)。

之前的心跳只是负责压制其他的 Peer 来发起选举,因此不用给 Leader 返回 reply,但日志复制就需要返回 reply 了。

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

// For debug
LOG(rf.me, rf.currentTerm, DDebug, "<- S%d, Receive log, Prev=[%d]T%d, Len()=%d", args.LeaderId, args.PrevLogIndex, args.PrevLogTerm, len(args.Entries))
// replay initialized
reply.Term = rf.currentTerm
reply.Success = false

// Term 对齐
if args.Term < rf.currentTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Higher term, T%d<T%d", args.LeaderId, args.Term, rf.currentTerm)
return
}
if args.Term >= rf.currentTerm {
rf.becomeFollowerLocked(args.Term)
}

// 没有足够的日志
if args.PrevLogIndex >= len(rf.log) {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Follower log too short, Len:%d <= Prev:%d", args.LeaderId, len(rf.log), args.PrevLogIndex)
return
}
// 对应位置上的日志 Term 不匹配
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Prev log not match, [%d]: T%d != T%d", args.LeaderId, args.PrevLogIndex, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm)
return
}

// append the leader logs to local
// 将 Leader 的 冲突位置之后的所有新条目 追加到 Follower 的日志中。
conflictLogIndex := args.PrevLogIndex + 1
conflictEntryIdx := 0
for ; conflictLogIndex < len(rf.log) && conflictEntryIdx < len(args.Entries); conflictLogIndex++, conflictEntryIdx++ {
if rf.log[conflictLogIndex].Term != args.Entries[conflictEntryIdx].Term {
break
}
}
if conflictEntryIdx < len(args.Entries) {
rf.log = append(rf.log[:conflictLogIndex], args.Entries[conflictEntryIdx:]...)
LOG(rf.me, rf.currentTerm, DLog2, "Follower append logs: (%d, %d]", conflictLogIndex-1, args.PrevLogIndex+len(args.Entries))
}
reply.Success = true

// TODO: handle the args.LeaderCommit

// reset the election timer, promising not start election in some interval
rf.resetElectionTimerLocked()
}

对于日志复制 RPC 的发送方来说,需要增加两部分逻辑:

  • 每个 RPC 发送前的参数构造;
  • 每个 RPC 返回值处理:
    • 如果复制成功,看看是否能够更新 Leader 的 commitIndex
    • 如果复制失败,需要将匹配点回退,继续试探

匹配点回退使用的是快速回溯算法,核心思想是快速回退到冲突任期(Term)的开始位置,从而高效解决 Leader 和 Follower 之间的日志不一致问题。它的设计目标是 减少不必要的 RPC 重试次数,直接跳过整个冲突任期的所有条目,而非逐条回退(线性回溯)。

func (rf *Raft) startReplication(term int) bool {
replicateToPeer := func(peer int, args *AppendEntriesArgs) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(peer, args, reply)

rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
LOG(rf.me, rf.currentTerm, DLog, "-> S%d, Lost or crashed", peer)
return
}

// Term 对齐
if reply.Term > rf.currentTerm {
rf.becomeFollowerLocked(reply.Term)
return
}

// 检测到了日志冲突,进行快速回溯,不只是一个 index 的回退
// 从 nextIndex[peer] - 1 即上一次尝试同步的 PrevLogIndex 开始回退
if !reply.Success {
idx := rf.nextIndex[peer] - 1
term := rf.log[idx].Term
// 找到该 term 的第一个 log
for idx > 0 && rf.log[idx].Term == term {
idx--
}
rf.nextIndex[peer] = idx + 1
LOG(rf.me, rf.currentTerm, DLog, "Log not matched in %d, Update next=%d", args.PrevLogIndex, rf.nextIndex[peer])
return
}

// Peer Apply 日志成功了,更新 Leader 的视图
rf.matchIndex[peer] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[peer] = rf.matchIndex[peer] + 1

// TODO: need compute the new commitIndex here,
// but we leave it to the other chapter
}

rf.mu.Lock()
defer rf.mu.Unlock()

// 上下文检查
if rf.contextLostLocked(Leader, term) {
LOG(rf.me, rf.currentTerm, DLog, "Lost Leader[%d] to %s[T%d]", term, rf.role, rf.currentTerm)
return false
}

// 给所有节点发送 RPC
for peer := 0; peer < len(rf.peers); peer++ {
if peer == rf.me {
// Don't forget to update Leader's matchIndex
rf.matchIndex[peer] = len(rf.log) - 1
rf.nextIndex[peer] = len(rf.log)
continue
}


prevIdx := rf.nextIndex[peer] - 1
prevTerm := rf.log[prevIdx].Term

args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: prevIdx,
PrevLogTerm: prevTerm,
Entries: append([]LogEntry(nil), rf.log[prevIdx+1:]...),
LeaderCommit: rf.commitIndex,
}
LOG(rf.me, rf.currentTerm, DDebug, "-> S%d, Send log, Prev=[%d]T%d, Len()=%d", peer, args.PrevLogIndex, args.PrevLogTerm, len(args.Entries))
go replicateToPeer(peer, args)
}

return true
}

这部分的最终目的,就是要更新 matchIndex。进而依据所有 Peer 的 matchIndex 来算 commitIndex 。Leader 有了 commitIndex 之后,再将其下发给各个 Follower,指导其各自更新本地 commitIndex 进而 apply。

除此之外,在当选 Leader 时,就需要更新对其他 Peer 的视图。

func (rf *Raft) becomeLeaderLocked() {
if rf.role != Candidate {
LOG(rf.me, rf.currentTerm, DError, "Only Candidate can become Leader")
return
}

LOG(rf.me, rf.currentTerm, DLeader, "Become Leader in T%d", rf.currentTerm)
rf.role = Leader
for peer := 0; peer < len(rf.peers); peer++ {
rf.nextIndex[peer] = len(rf.log)
rf.matchIndex[peer] = 0
}
}

在有了日志的概念后,在进行 Leader 选举的时候,就需要进行日志比较了。在进行选举的时候,确保只有具有比大多数 Peer 更新日志的候选人才能当选 Leader。

那么因此会引申出一个问题,对于两个 Peer 来说,谁的日志才算是最新的。论文的描述是这样的:

  • Term 高的 Peer 日志越新;
  • Term 相同,Index 大的 Peer 日志越新;
// 作为 peer 收到 Candidate RPC 比较日志谁更新
func (rf *Raft) isMoreUpDateLocked(candidateIndex, candidateTerm int) bool {
l := len(rf.log)
lastIndex, lastTerm := l-1, rf.log[l-1].Term

LOG(rf.me, rf.currentTerm, DVote, "Compare last log, Me: [%d]T%d, Candidate: [%d]T%d", lastIndex, lastTerm, candidateIndex, candidateTerm)
if lastTerm != candidateTerm {
return lastTerm > candidateTerm
}
return lastIndex > candidateIndex
}

在投票 RPC 中最后一条日志的 Term 和 Index 信息,用于日志比较:

type RequestVoteArgs struct {
// Your data here (PartA, PartB).
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}

在构造 RPC 请求的参数时,加上这两个字段:

args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: l-1,
LastLogTerm: rf.log[l-1].Term,
}

接收方在进行对齐 Term,如果没有投过票,通过日志比较来决定是否能给该 Peer 投票。

// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// ...
// check for votedFor
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject Voted, Already voted to S%d", args.CandidateId, rf.votedFor)
return
}

// check log, only grante vote when the candidates have more up-to-date log
if rf.isMoreUpToDateLocked(args.LastLogIndex,args.LastLogTerm) {
LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject Vote, S%d's log less up-to-date", args.CandidateId)
return
}
// ...
}

下面实现关于日志 Apply 的逻辑,也就是 Leader 提交日志的部分。Apply 只有在 commitIndex 增大的时候才会触发,因此可以利用一下 golang 的语法 sync.Cond,使用唤醒机制,在 commitIndex 增大后唤醒 applyTicker

根据 Figure 2 中补充两个字段:

  • commitIndex:全局日志提交进度
  • lastApplied:本地 Peer 日志提交进度
  • applyCond:用于唤醒 applyTicker
  • applyCh:将 Apply 工作流构造的 Msg 传递给应用层

ApplyTicker 工作流的具体步骤:

  • 构造所有待 apply 的 ApplyMsg
  • 遍历这些 msgs,进行 apply
  • 更新 lastApplied
func (rf *Raft) applyTicker() {
for !rf.killed() {
rf.mu.Lock()
rf.applyCond.Wait()

entries := make([]LogEntry, 0)
// should start from rf.lastApplied+1 instead of rf.lastApplied
for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
entries = append(entries, rf.log[i])
}
rf.mu.Unlock()

for i, entry := range entries {
rf.applyCh <- ApplyMsg{
CommandValid: entry.CommandValid,
Command: entry.Command,
CommandIndex: rf.lastApplied + 1 + i,
}
}

rf.mu.Lock()
LOG(rf.me, rf.currentTerm, DApply, "Apply log for [%d, %d]", rf.lastApplied+1, rf.lastApplied+len(entries))
rf.lastApplied += len(entries)
rf.mu.Unlock()
}
}

在 Leader 给其他 Peer AppendEntries 成功后,会更新 rf.matchIndex

replicateToPeer := func(peer int, args *AppendEntriesArgs) {
// ......

// update the commmit index if log appended successfully
rf.matchIndex[peer] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[peer] = rf.matchIndex[peer] + 1 // important: must update
majorityMatched := rf.getMajorityIndexLocked()
if majorityMatched > rf.commitIndex {
LOG(rf.me, rf.currentTerm, DApply, "Leader update the commit index %d->%d", rf.commitIndex, majorityMatched)
rf.commitIndex = majorityMatched
rf.applyCond.Signal()
}
}

在每次更新 matchIndex 后,依据此全局匹配点视图,可以算出多数 Peer 的匹配点,进而更新 Leader 的 CommitIndex。这里使用排序后找中位数的方法。由于排序会改变原数组,因此要把 matchIndex 复制一份再进行排序。

func (rf *Raft) getMajorityIndexLocked() int {
// TODO(spw): may could be avoid copying
tmpIndexes := make([]int, len(rf.matchIndex))
copy(tmpIndexes, rf.matchIndex)
sort.Ints(tmpIndexes)
majorityIdx := (len(tmpIndexes) - 1) / 2
LOG(rf.me, rf.currentTerm, DDebug, "Match index after sort: %v, majority[%d]=%d", tmpIndexes, majorityIdx, tmpIndexes[majorityIdx])
return tmpIndexes[majorityIdx] // min -> max
}

如果 commitIndex 更新,则唤醒 apply 工作流,可以将 msg 更新到本地日志了。Leader 通过下一次的 AppendEntries 的 RPC 参数将 commitIndex 发送给每个 Follower,因此需要再 AppendEntriesArgs 增加这个参数。

每个 Follower 通过 AppendEntries 的回调函数收到 Leader 发来的 LeaderCommit,来更新本地的 CommitIndex,进而驱动 Apply 工作流开始干活。

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
// ...
// update the commit index if needed and indicate the apply loop to apply
if args.LeaderCommit > rf.commitIndex {
LOG(rf.me, rf.currentTerm, DApply, "Follower update the commit index %d->%d", rf.commitIndex, args.LeaderCommit)
rf.commitIndex = args.LeaderCommit
if rf.commitIndex >= len(rf.log) {
rf.commitIndex = len(rf.log) - 1
}
rf.applyCond.Signal()
}
// ...
}

状态持久化(Part C)

目前能够做到在「不宕机」下的领导者选举。但是在某个 Peer 异常重启后,是不能正常重新加入集群的。为此需要将 Raft 的关键信息定时持久化,重启后加载,以保证重新加入集群。

主要是两个序列化和反序列化函数:

func (rf *Raft) persistString() string {
return fmt.Sprintf("T%d, VotedFor: %d, Log: [0: %d)", rf.currentTerm, rf.votedFor, len(rf.log))
}

func (rf *Raft) persistLocked() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
raftstate := w.Bytes()
// leave the second parameter nil, will use it in PartD
rf.persister.Save(raftstate, nil)
}

func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 {
return
}

var currentTerm int
var votedFor int
var log []LogEntry

r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
if err := d.Decode(&currentTerm); err != nil {
LOG(rf.me, rf.currentTerm, DPersist, "Read currentTerm error: %v", err)
return
}
rf.currentTerm = currentTerm

if err := d.Decode(&votedFor); err != nil {
LOG(rf.me, rf.currentTerm, DPersist, "Read votedFor error: %v", err)
return
}
rf.votedFor = votedFor

if err := d.Decode(&log); err != nil {
LOG(rf.me, rf.currentTerm, DPersist, "Read log error: %v", err)
return
}
rf.log = log
LOG(rf.me, rf.currentTerm, DPersist, "Read Persist %v", rf.persistString())
}

主要需要持久化的有三个字段:currentTermvotedForlog,为什么只需要持久化这三个字段:

  • currentTerm:重启后一定要知道自己之前任期到了哪里,因为任期是状态机中正确行为的基础;
  • votedFor:如果在某个任期已经投过票了,重启之后不能投票,否则会多一票;
  • log:日志作为数据,理所当然要持久化;

另外 persist 函数需要访问全局状态,因此需要在临界区调用,不然会出现竞争。在所有修改全局状态的地方,修改完成之后都需要做一下持久化。

日志回溯优化

之前在 Leader 发送同步日志时,采用的是回退的方式,而每次回退都需要下一个 RPC 才能发送给 Follower,也就是说每次回退都要经过一个 replicateInterval。如果回退算法不够好,某些情况下,冲突探测时间会特别长。「Leader 只基于自己的日志进行回退的情况下,一次不管是回退一个 index 还是回退一个 term 效果都不好」。因此,可以考虑让 Follower 提供一些信息,告诉 Leader 自己的日志目前到哪里了。

AppendEntriesReply 增加两个额外的字段,让 Follower 携带一些日志冲突的信息。

type AppendEntriesReply struct {
Term int
Success bool

ConflictIndex int
ConflictTerm int
}

Follower 算法大致流程:

  • 如果 Follower 日志过短,那么 ConflictTerm 置空,ConflictIndex = len(rf.log)
  • 否则,将 ConflictTerm 设置为 Follower 在 Leader.PrevLogIndex 处日志的 Term;ConflictIndexConflictTerm 的第一条日志;

第一条做法的目的在于,如果 Follower 日志过短,可以提示 Leader 迅速回退到 Follower 日志的末尾。第二条做法的目的在于,如果 Follower 存在 Leader.PrevLog,但不匹配,则将对应的 Term 的日志全部跳过。

// --- rf.AppendEntries in raft_replication.go

// return failure if prevLog not matched
if args.PrevLogIndex >= len(rf.log) {
reply.ConflictIndex = len(rf.log)
reply.ConflictTerm = InvalidTerm
return
}
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
reply.ConflictIndex = rf.firstLogFor(reply.ConflictTerm)
return
}

Leader 端使用上面两个新增字段的算法如下:

  • 如果 ConflictTerm 为空,说明 Follower 日志太短,直接将 nextIndex 赋值为 ConflictIndex 迅速回退到 Follower 日志末尾。
  • 否则,以 Leader 日志为准,跳过 ConflictTerm 的所有日志;如果发现 Leader 日志中不存在 ConflictTerm 的任何日志,则以 Follower 为准跳过 ConflictTerm,即使用 ConflictIndex
// --- rf.startReplication.replicateToPeer in raft_replication.go
if !reply.Success {
prevNext := rf.nextIndex[peer]
if reply.ConflictTerm == InvalidTerm {
rf.nextIndex[peer] = reply.ConflictIndex
} else {
firstTermIndex := rf.firstLogFor(reply.ConflictTerm)
if firstTermIndex != InvalidIndex {
rf.nextIndex[peer] = firstTermIndex
} else {
rf.nextIndex[peer] = reply.ConflictIndex
}
}
// avoid the late reply move the nextIndex forward again
rf.nextIndex[peer] = MinInt(prevNext, rf.nextIndex[peer])
return
}

空 Term 和 空 Index 可以用一个特殊值来表示。

封装一个函数用于在日志中找到指定 term 的第一条日志。

// --- in raft.go
func (rf *Raft) firstLogFor(term int) int {
for i, entry := range rf.log {
if entry.Term == term {
return i
} else if entry.Term > term {
break
}
}
return InvalidIndex
}

为了避免 Reply 乱序到达,导致探测点乱序前进和回退,需要保证 nextIndex 是单调递减的。

// --- rf.startReplication in raft_replication.go
if !reply.Success {
prevIndex := rf.nextIndex[peer]
// avoid the late reply move the nextIndex forward again
if rf.nextIndex[peer] > prevIndex {
rf.nextIndex[peer] = prevIndex
}
return
}

日志压缩(Part D)

对于一个长时间运行的 Raft 系统,如果持续收到日志回会遇到「空间不足」「启动过慢」等问题。日志无限的追加下去,本地硬盘空间可能存不下。重启时需要重放所有日志,如果日志过长,重放过程将会持续很久不能正常对外提供服务。

最容易想到的方法就是定期对日志做快照。针对某个日志 entry 做了快照之后,该 entry 及之前的日志都可以被截断。为什么需要这么做呢?相比于日志,快照的存储更加紧凑。日志记录的是对状态机修改的事件,比如 update k1 = v1,而快照通常记录的是 k1: v1,一条数据会在日志中出现多次,但是在快照中只会保存其最后一次的状态。

截断的同时也会带来新的问题,如果一个 term 较低的节点加入了集群,Leader 需要发送过往的日志给该 Follower,发现过往日志已经被截断了。这时候就需要引入一个新的 RPC InstallSnapshot,将 Leader 的 Snapshot 全量同步给 Follower,再做之后的增量同步。

下面只贴出关键代码,其中还有对 Part A、B、C 代码的修改,完整代码详见 Github 仓库

将对 Log 的结构全部都抽离出来,之前只用 []LogEntry 来表示日志,现在构造一个新的 RaftLog 来表示维护的日志。

type RaftLog struct {
// 记录前一个日志的界限
// 最后一条日志的 index
snapLastIdx int
// 最后一条日志的 term
snapLastTerm int

// 日志压缩的前半段,从 [1, snapLastIdx] 开始的日志
snapshot []byte

// 日志的后半段,从 (snapLastIdx, snapLastIdx + len(tailLog) - 1] 开始的日志
// tailLog[0] 是一条 dummy 日志,存储的是 snapLastTerm
// 这里用的值类型,可能存在拷贝带来的性能问题
tailLog []LogEntry
}

RaftLog 主要维护三部分的信息:

  1. 截断后压缩的日志 snapshot;
  2. 后续的剩余日志 tailLog;
  3. 两者的分界线;

为了避免边界判断,在 tailLog 中添加一个 dummy 日志,将 tailLog 中下标为 0 的日志留空,但是给它的 Term 赋值 snapLastTerm。同时添加一些下标转换的工具函数,因为 tailLog 中日志的 index 不是真正日志的 Index

func NewLog(snapLastIdx, snapLastTerm int, snapshot []byte, entries []LogEntry) *RaftLog {
rl := &RaftLog{
snapLastIdx: snapLastIdx,
snapLastTerm: snapLastTerm,
snapshot: snapshot,
}

// make the len = 0, cap = 1 + len(entries)
rl.tailLog = make([]LogEntry, 0, 1+len(entries))
rl.tailLog = append(rl.tailLog, LogEntry{
Term: snapLastTerm,
})
rl.tailLog = append(rl.tailLog, entries...)

return rl
}

// 存储在 tailLog 的 index 不是实际日志的 index
// 需要做一个 index 的转换计算
func (rl *RaftLog) idx(logicIdx int) int {
// 超出了 tailLog 的界限
if logicIdx < rl.snapLastIdx || logicIdx >= rl.size() {
panic(fmt.Sprintf("%d is out of [%d, %d]", logicIdx, rl.snapLastIdx, rl.size()-1))
}
return logicIdx - rl.snapLastIdx
}

func (rl *RaftLog) size() int {
return rl.snapLastIdx + len(rl.tailLog)
}

func (rl *RaftLog) at(logicIdx int) LogEntry {
return rl.tailLog[rl.idx(logicIdx)]
}

// 封装 append 操作
func (rl *RaftLog) append(e LogEntry) {
rl.tailLog = append(rl.tailLog, e)
}

func (rl *RaftLog) last() (int, int) {
i := len(rl.tailLog) - 1
return rl.snapLastIdx + i, rl.tailLog[i].Term
}

func (rl *RaftLog) appendFrom(logicPrevIndex int, entries []LogEntry) {
rl.tailLog = append(rl.tailLog[:rl.idx(logicPrevIndex)+1], entries...)
}

func (rl *RaftLog) tail(startIdx int) []LogEntry {
if startIdx >= rl.size() {
return nil
}

return rl.tailLog[rl.idx(startIdx):]
}

除了构造函数,宕机重启,也会通过读取各个字段,进行反序列化构造 RaftLog

// 宕机重启需要做解序列化
func (rl *RaftLog) readPersist(d *labgob.LabDecoder) error {
var lastIdx int
if err := d.Decode(&lastIdx); err != nil {
return fmt.Errorf("decode last include index failed")
}
rl.snapLastIdx = lastIdx

var lastTerm int
if err := d.Decode(&lastTerm); err != nil {
return fmt.Errorf("decode last include term failed")
}
rl.snapLastTerm = lastTerm

var log []LogEntry
if err := d.Decode(&log); err != nil {
return fmt.Errorf("decode tailLog failed")
}
rl.tailLog = log

return nil
}

func (rl *RaftLog) persist(e *labgob.LabEncoder) {
e.Encode(rl.snapLastIdx)
e.Encode(rl.snapLastTerm)
e.Encode(rl.tailLog)
}

应用层发出请求,要求 Raft 层在 index 处做一个快照,同时 index 之前的日志就可以释放掉了。在进行日志截断时,注意要新建一个数组,而不是使用下标切片运算,只有新建数组,才会对原数组释放引用,GC 才能够把原来的空间回收掉。

// 从 App Layer 从 Raft Layer 传递
func (rl *RaftLog) doSnapshot(index int, snapshot []byte) {
idx := rl.idx(index)

rl.snapLastTerm = rl.tailLog[idx].Term
rl.snapLastIdx = index
rl.snapshot = snapshot

newLog := make([]LogEntry, 0, rl.size()-rl.snapLastIdx)
newLog = append(newLog, LogEntry{
Term: rl.snapLastTerm,
})
newLog = append(newLog, rl.tailLog[idx+1:]...)
rl.tailLog = newLog
}

整个 InstallSnapshot RPC 的流程,这部分的代码详见 raft_compaction.go

  1. Leader 应用层调用 raft.Snapshot(index, snapshot) 函数,主要负责:
    1. 保存 snapshot
    2. 截断日志
    3. 持久化
  2. Leader 在需要时使用该 snapshot 构造参数发送 RPC 给 Follower
  3. Follower 收到 RPC 后替换本地日志,并将其持久化
  4. Follower 通过 ApplyMsg 讲 snapshot 传递给 Follower 应用层

后续讨论一下注意点:

Follower 收到 snapshot apply 的时候完全 apply?那应用层如何进行去重?Snapshot 中的 kv 带 index 吗?

  1. 应用层 Apply snapshot 时,是覆盖式(全量式,因为 Snapshot 下标肯定从 0 开始) Apply;在 Apply log 时,是增量式的 Apply。两者不同。
  2. 因此在 Apply snapshot 时,并不需要去重(因为不是增量式的),直接替换掉当前状态机即可。

应用层调用 snapshot(index, snapshot)意味着 什么?

  1. 对应用层意味着:
    1. 应用已经做好了一个快照,但编解码方式只有应用层自己知道,Raft 层不感知。
    2. 应用层自己也会保存该快照,之后宕机重启后会先加载该快照。此时(宕机重启后) Raft 层也要记得更新自己的 commitIndex 和 lastApplied。
  2. 对 Raft 层意味着:
    1. 应用层告诉 Raft 层,你可以把 index 及以前的日志给释放掉了。
    2. Raft 层要保存下 Snapshot,万一其变为 Leader 之后需要给 Follower 发。
    3. 保存分为在内存中保存和持久化到外存。

需要对原流程修改的地方?

  1. Leader 发送 entries 的时候,要先检查要发送的 PrevLogEntry 还在不在,如果不在了,需要先发 snapshot。

宕机重启后,lastApplied 怎么初始化?

  1. 如果有 snapshot 存在:则需要初始化为 snapshot 的 lastIncludedIndex。因为应用层肯定也是从自己 snapshot 中来恢复的。
    1. 在 Make 阶段同步的再 apply 一个 snapshot 到 appplyCh 中有可能会直接 block 住。
    2. 即使不阻塞,有时候也会遇到 snapshot decode error 报错。
  2. 如果没有 snapshot 存在:那就初始化为 0 。
    1. 如果初始化为 snapLastIndex,那前面的需要 apply 吗?
    2. 不需要,因为应用层自己会从自己保存的 snapshot 中恢复。