// ---------- for RequestVote ---------- type RequestVoteArgs struct { // Your data here (2A, 2B). Term int// candidate's curTerm CandidateId int// candidate requesting vote }
type RequestVoteReply struct { // Your data here (2A). Term int// currentTerm, for candidate to update itself VoteGranted bool// true means candidate received vote }
func(rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { ok := rf.peers[server].Call("Raft.RequestVote", args, reply) return ok }
// ---------- for AppendEntries ---------- type AppendEntriesArgs struct { Term int// leader's curTerm LeaderId int// so follower can redirect clients }
type AppendEntriesReply struct { Term int// currentTerm, for leader to update itself Success bool// true if follower contained entry matching prevLogIndex and prevLogTerm }
在 type Raft struct 中增加了一些 paper 中提到的本实现需要用到的字段,包括上面提到的接受 RPC 参数的 channel:
1 2 3 4 5 6 7 8 9 10
// Your data here (2A, 2B, 2C). // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain. curTerm int// current curTerm state RState // current state votedFor int// candidate id that received vote in current curTerm
voteChan chan voteParam // channel for vote request entryChan chan appendEntryParam // channel for entry request
有关 RState interface 和具体的实现,定义如下:
1 2 3 4 5 6 7
type RState interface { Run(tf *Raft) }
type Follower struct{} type Candidate struct{} type Leader struct{}
timer := time.NewTimer(electionTimeout()) for { select { case <-timer.C: // d 超时 case reply := <-replyChan: // g RequestVote 回答 case vote := <-rf.voteChan: // e RequestVote 请求 case entry := <-rf.entryChan: // f AppendEntries 请求 } } }
首先自增 curTerm,发送 RequestVote 请求其他节点的选票,随后开启循环等待事件驱动,其中 d,e,f 遵随 paper 所描述即可,只需要注意加锁释放锁的时机,这里简单贴一下代码:
rf.mu.Lock() if reply.Term > rf.curTerm { // received larger term, become follower rf.curTerm = reply.Term rf.votedFor = -1 rf.state = &Follower{} rf.mu.Unlock() return } elseif reply.Term == rf.curTerm && reply.VoteGranted { // received a grantVote grantedCnt++ if grantedCnt >= minVote { rf.state = &Leader{} rf.mu.Unlock() return } } rf.mu.Unlock()
leader.go
框架:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
func(l *Leader) Run(rf *Raft) { // TODO: send heartbeat // TODO: send AppendEntries RPCs to all other servers
// make heartbeat timer timer := time.NewTimer(heartbeatTimeout()) for { select { case <-timer.C: // h 超时 case reply := <-replyChan: // k AppendEntries 回答 case vote := <-rf.voteChan: // i RequestVote 请求 case entry := <-rf.entryChan: // j AppendEntries 请求 } } }