持久化与快照说明
持久化:为了使server重启后可以恢复原来的状态,按照Figure2,保存currentTerm,votedFor和log[]三个状态。
快照:为了防止log无限增大占用太多空间,采用快照机制,并持久化在磁盘中。
快照带来的问题
由于引入快照,物理索引和逻辑索引不再统一
eg.
快照前:
logic index : 0 1 2 3 4 physic index: 0 1 2 3 4 log entry : c1 c2 c3 c4 c5
当对索引2处做snapshot后:
logic index : 3 4 physic index: 0 1 log entry :c4 c5
可以看到,在snapshot后,物理index和逻辑index之间差了一个首entry的逻辑索引值。因此,需要将逻辑索引保存在log entry中。
在代码中凡是涉及到index操作的(如log读取等),都需要对其做修正。
快照带来的另一个问题是:在leader和follower的日志不统一时,follower会发送conflictIndex供leader修改prevIndex。当prevIndex小于leader的第一个log entry的逻辑索引时,代表此index处的日志条目已经被快照了。此时leader应该向follower发送InstallSnapshot RPC来更新follower的信息。
实现
架构优化
上一篇blog提到,sendEntries的函数耦合太严重造成锁上的性能问题,这个函数可以分为几大逻辑:
- 判断是否需要发送快照以及构造发送AppendEntries和InstallSnapshot的参数
- 实际的发送RPC操作
- 处理rpc回复结果
此外,将心跳与实际append操作进行解耦,方便不同粒度的控制rpc发送频次 所以我们将原来的一个函数拆成这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// buildOutbound constructs either snapshot or append args for a given follower.
// withEntries controls whether to include log entries (append) or send an empty heartbeat.
// Returns: needSnapshot, snapshotArgs, appendArgs, valid (still leader in term when built).
func (rf *Raft) buildOutbound(server int, term int, withEntries bool) (bool, InstallSnapshotArgs, AppendEntriesArgs, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != Leader || rf.currentTerm != term {
return false, InstallSnapshotArgs{}, AppendEntriesArgs{}, false
}
first := rf.getFirstLog()
prevLogIndex := rf.nextIndex[server] - 1
// Need snapshot
if prevLogIndex < first.Index {
snap := InstallSnapshotArgs{
Term: term,
LeaderId: rf.me,
LastIncludeIndex: first.Index,
LastIncludeTerm: first.Term,
Data: rf.persister.ReadSnapshot(),
}
return true, snap, AppendEntriesArgs{}, true
}
prevLogTerm := rf.log[prevLogIndex-first.Index].Term
leaderCommit := rf.commitIndex
args := AppendEntriesArgs{
Term: term,
LeaderID: rf.me,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
LeaderCommit: leaderCommit,
}
if withEntries && rf.nextIndex[server] <= rf.getLastIndex() {
args.Entries = rf.log[rf.nextIndex[server]-first.Index:]
} else {
args.Entries = []LogEntry{}
}
return false, InstallSnapshotArgs{}, args, true
}
// handleAppendReply updates follower progress and handles term changes.
// Returns (done) whether replication to this follower can stop.
func (rf *Raft) handleAppendReply(server int, term int, args AppendEntriesArgs, reply AppendEntriesReply) (done bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != Leader || rf.currentTerm != term {
return true
}
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = Follower
rf.voteFor = -1
rf.persist(false, nil)
go rf.sendHeartbeat(false)
return true
}
if reply.Success {
if args.PrevLogIndex != rf.nextIndex[server]-1 {
return true
}
if len(args.Entries) > 0 {
rf.nextIndex[server] = args.PrevLogIndex + len(args.Entries) + 1
rf.matchIndex[server] = rf.nextIndex[server] - 1
}
return true
}
// conflict resolution
if reply.ConflictIndex <= rf.getFirstLog().Index {
rf.nextIndex[server] = rf.getFirstLog().Index
} else {
rf.nextIndex[server] = reply.ConflictIndex
}
return false
}
// replicateToPeer keeps trying to bring one follower up to date.
// withEntries=true sends entries; false sends heartbeat-only (but may still install snapshot if needed).
func (rf *Raft) replicateToPeer(server int, withEntries bool, term int) {
localWithEntries := withEntries
for !rf.killed() {
needSnap, snapArgs, appArgs, ok := rf.buildOutbound(server, term, localWithEntries)
if !ok {
return
}
if needSnap {
reply := InstallSnapshotReply{}
if rf.sendInstallSnapshot(server, &snapArgs, &reply) {
rf.mu.Lock()
if rf.state != Leader || rf.currentTerm != term {
rf.mu.Unlock()
return
}
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = Follower
rf.voteFor = -1
rf.persist(false, nil)
rf.mu.Unlock()
rf.sendHeartbeat(false)
return
}
// successful install snapshot advances indices
rf.nextIndex[server] = snapArgs.LastIncludeIndex + 1
rf.matchIndex[server] = snapArgs.LastIncludeIndex
rf.mu.Unlock()
// after snapshot, try append entries
localWithEntries = true
continue
}
time.Sleep(50 * time.Millisecond)
continue
}
// AppendEntries path
reply := AppendEntriesReply{}
if rf.sendAppendEntries(server, &appArgs, &reply) {
done := rf.handleAppendReply(server, term, appArgs, reply)
if done {
return
}
// If this was a heartbeat and we hit a conflict, escalate to append
localWithEntries = true
continue
}
// RPC failed; retry
time.Sleep(50 * time.Millisecond)
continue
}
}
// broadcastHeartbeat sends heartbeats (no log entries) to all followers.
func (rf *Raft) broadcastHeartbeat() {
rf.mu.Lock()
term := rf.currentTerm
rf.mu.Unlock()
for i := range rf.peers {
if i == rf.me {
continue
}
go rf.replicateToPeer(i, false, term)
}
}
// broadcastAppend sends append entries (with log entries when available) to all followers.
func (rf *Raft) broadcastAppend() {
rf.mu.Lock()
term := rf.currentTerm
rf.mu.Unlock()
for i := range rf.peers {
if i == rf.me {
continue
}
go rf.replicateToPeer(i, true, term)
}
}
通过逻辑的解耦,降低了锁的开销,使整体性能尤其是3B测试和3D测试的性能提高了50%以上。
RPC handler
注意在InstallSnapshot RPC中也需要处理term不匹配导致的退位问题
另外,需要处理当snapshot造成日志需要截断时的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
// term handling
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
rf.mu.Unlock()
return
}
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.voteFor = -1
rf.state = Follower
rf.persist(false, nil)
}
reply.Term = rf.currentTerm
// receiving valid RPC resets election timer
rf.electionTimer.Reset(rf.getNewElectionTimeout())
// If snapshot is outdated (we already have equal/newer snapshot), ignore
if args.LastIncludeIndex <= rf.getFirstLog().Index {
rf.mu.Unlock()
return
}
// Rebuild log with snapshot sentinel and an optional suffix to retain
firstIdx := rf.getFirstLog().Index
lastIdx := rf.getLastIndex()
var newLog []LogEntry
if args.LastIncludeIndex <= lastIdx {
// There may be a suffix to keep
cut := args.LastIncludeIndex - firstIdx
// Guard: cut could be out of bound if state is inconsistent
if cut >= 0 && cut < len(rf.log) && rf.log[cut].Term == args.LastIncludeTerm {
// keep suffix strictly after snapshot index
suffix := rf.log[cut+1:]
newLog = make([]LogEntry, 1+len(suffix))
newLog[0] = LogEntry{Index: args.LastIncludeIndex, Term: args.LastIncludeTerm}
copy(newLog[1:], suffix)
} else {
// terms conflict; drop everything and start fresh from sentinel
newLog = []LogEntry
}
} else {
// Snapshot goes beyond our last log; drop everything
newLog = []LogEntry
}
rf.log = newLog
// bump commitIndex/lastApplied to at least snapshot index
if rf.commitIndex < args.LastIncludeIndex {
rf.commitIndex = args.LastIncludeIndex
}
if rf.lastApplied < args.LastIncludeIndex {
rf.lastApplied = args.LastIncludeIndex
}
// Persist new state and snapshot atomically
rf.persist(true, &args.Data)
// deliver snapshot to service outside the lock
msg := raftapi.ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: args.LastIncludeTerm,
SnapshotIndex: args.LastIncludeIndex,
}
rf.mu.Unlock()
rf.applyCh <- msg // send message out the critical section!
}
最终性能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
=== RUN TestInitialElection3A
Test (3A): initial election (reliable network)...
... Passed -- time 3.0s #peers 3 #RPCs 54 #Ops 0
--- PASS: TestInitialElection3A (3.02s)
=== RUN TestReElection3A
Test (3A): election after network failure (reliable network)...
... Passed -- time 4.5s #peers 3 #RPCs 131 #Ops 0
--- PASS: TestReElection3A (4.51s)
=== RUN TestManyElections3A
Test (3A): multiple elections (reliable network)...
... Passed -- time 5.5s #peers 7 #RPCs 474 #Ops 0
--- PASS: TestManyElections3A (5.52s)
=== RUN TestBasicAgree3B
Test (3B): basic agreement (reliable network)...
... Passed -- time 0.7s #peers 3 #RPCs 18 #Ops 0
--- PASS: TestBasicAgree3B (0.66s)
=== RUN TestRPCBytes3B
Test (3B): RPC byte count (reliable network)...
... Passed -- time 1.5s #peers 3 #RPCs 50 #Ops 0
--- PASS: TestRPCBytes3B (1.53s)
=== RUN TestFollowerFailure3B
Test (3B): test progressive failure of followers (reliable network)...
... Passed -- time 4.9s #peers 3 #RPCs 127 #Ops 0
--- PASS: TestFollowerFailure3B (4.89s)
=== RUN TestLeaderFailure3B
Test (3B): test failure of leaders (reliable network)...
... Passed -- time 5.0s #peers 3 #RPCs 221 #Ops 0
--- PASS: TestLeaderFailure3B (4.99s)
=== RUN TestFailAgree3B
Test (3B): agreement after follower reconnects (reliable network)...
... Passed -- time 5.5s #peers 3 #RPCs 124 #Ops 0
--- PASS: TestFailAgree3B (5.51s)
=== RUN TestFailNoAgree3B
Test (3B): no agreement if too many followers disconnect (reliable network)...
... Passed -- time 3.5s #peers 5 #RPCs 218 #Ops 0
--- PASS: TestFailNoAgree3B (3.50s)
=== RUN TestConcurrentStarts3B
Test (3B): concurrent Start()s (reliable network)...
... Passed -- time 0.7s #peers 3 #RPCs 22 #Ops 0
--- PASS: TestConcurrentStarts3B (0.70s)
=== RUN TestRejoin3B
Test (3B): rejoin of partitioned leader (reliable network)...
... Passed -- time 4.1s #peers 3 #RPCs 167 #Ops 0
--- PASS: TestRejoin3B (4.10s)
=== RUN TestBackup3B
Test (3B): leader backs up quickly over incorrect follower logs (reliable network)...
... Passed -- time 17.1s #peers 5 #RPCs 3716 #Ops 0
--- PASS: TestBackup3B (17.13s)
=== RUN TestCount3B
Test (3B): RPC counts aren't too high (reliable network)...
... Passed -- time 2.1s #peers 3 #RPCs 60 #Ops 0
--- PASS: TestCount3B (2.06s)
=== RUN TestPersist13C
Test (3C): basic persistence (reliable network)...
... Passed -- time 4.0s #peers 3 #RPCs 80 #Ops 0
--- PASS: TestPersist13C (3.95s)
=== RUN TestPersist23C
Test (3C): more persistence (reliable network)...
... Passed -- time 12.5s #peers 5 #RPCs 385 #Ops 0
--- PASS: TestPersist23C (12.55s)
=== RUN TestPersist33C
Test (3C): partitioned leader and one follower crash, leader restarts (reliable network)...
... Passed -- time 1.8s #peers 3 #RPCs 42 #Ops 0
--- PASS: TestPersist33C (1.83s)
=== RUN TestFigure83C
Test (3C): Figure 8 (reliable network)...
... Passed -- time 32.9s #peers 5 #RPCs 1304 #Ops 0
--- PASS: TestFigure83C (32.90s)
=== RUN TestUnreliableAgree3C
Test (3C): unreliable agreement (unreliable network)...
... Passed -- time 3.6s #peers 5 #RPCs 1419 #Ops 0
--- PASS: TestUnreliableAgree3C (3.59s)
=== RUN TestFigure8Unreliable3C
Test (3C): Figure 8 (unreliable) (unreliable network)...
... Passed -- time 30.1s #peers 5 #RPCs 20544 #Ops 0
--- PASS: TestFigure8Unreliable3C (30.13s)
=== RUN TestReliableChurn3C
Test (3C): churn (reliable network)...
... Passed -- time 16.2s #peers 5 #RPCs 9140 #Ops 0
--- PASS: TestReliableChurn3C (16.15s)
=== RUN TestUnreliableChurn3C
Test (3C): unreliable churn (unreliable network)...
... Passed -- time 16.1s #peers 5 #RPCs 3029 #Ops 0
--- PASS: TestUnreliableChurn3C (16.11s)
=== RUN TestSnapshotBasic3D
Test (3D): snapshots basic (reliable network)...
... Passed -- time 3.4s #peers 3 #RPCs 516 #Ops 0
--- PASS: TestSnapshotBasic3D (3.36s)
=== RUN TestSnapshotInstall3D
Test (3D): install snapshots (disconnect) (reliable network)...
... Passed -- time 38.7s #peers 3 #RPCs 1673 #Ops 0
--- PASS: TestSnapshotInstall3D (38.70s)
=== RUN TestSnapshotInstallUnreliable3D
Test (3D): install snapshots (disconnect) (unreliable network)...
... Passed -- time 42.5s #peers 3 #RPCs 2014 #Ops 0
--- PASS: TestSnapshotInstallUnreliable3D (42.49s)
=== RUN TestSnapshotInstallCrash3D
Test (3D): install snapshots (crash) (reliable network)...
... Passed -- time 29.1s #peers 3 #RPCs 1217 #Ops 0
--- PASS: TestSnapshotInstallCrash3D (29.07s)
=== RUN TestSnapshotInstallUnCrash3D
Test (3D): install snapshots (crash) (unreliable network)...
... Passed -- time 31.5s #peers 3 #RPCs 1562 #Ops 0
--- PASS: TestSnapshotInstallUnCrash3D (31.49s)
=== RUN TestSnapshotAllCrash3D
Test (3D): crash and restart all servers (unreliable network)...
... Passed -- time 9.8s #peers 3 #RPCs 331 #Ops 0
--- PASS: TestSnapshotAllCrash3D (9.80s)
=== RUN TestSnapshotInit3D
Test (3D): snapshot initialization after crash (unreliable network)...
... Passed -- time 2.9s #peers 3 #RPCs 90 #Ops 0
--- PASS: TestSnapshotInit3D (2.91s)
PASS
ok 6.5840/raft1 333.975s