本次实验的目标是构建一个基于 Raft 的分布式键值存储服务(KVRaft)。这个服务需要保证线性一致性,并能够在网络分区、服务器崩溃等各种故障场景下正确运行。
实验的核心挑战在于:
- 如何在 Raft 之上实现精确一次(exactly-once)语义
- 如何处理客户端重试带来的重复请求
- 如何正确实现版本控制机制
- 如何在各种故障场景下保持系统的正确性
信息流动过程如下:
1
2
客户端 -> KVServer.Get/Put (RPC) -> RSM.Submit -> Raft ->
-> 日志复制到多数节点 -> 应用到状态机 -> KVServer.DoOp -> 返回结果
服务器端设计
KVServer 的核心数据结构包括:
1
2
3
4
5
6
7
8
9
10
11
12
13
type KVServer struct {
me int
rsm *rsm.RSM
mu sync.Mutex
// 存储层
data map[string]string
versions map[string]rpc.Tversion
// 去重层
lastApplied map[int64]int64 // clientId -> 最后处理的 seqNum
lastResults map[int64]any // clientId -> 缓存的响应
}
存储层负责维护实际的键值数据和版本号。去重层则用于实现精确一次语义,防止重复请求被多次执行。
客户端设计
客户端需要为每个请求分配唯一标识:
1
2
3
4
5
type Clerk struct {
clientId int64 // 客户端唯一标识
seqNum int64 // 单调递增的序列号
lastLeader int // 上次成功的 leader 索引
}
每个请求都携带 (clientId, seqNum) 二元组,服务器通过这个二元组来识别和去重。
核心实现细节
DoOp 方法:状态机的执行逻辑
DoOp 是整个系统的核心,它在每个服务器上以相同的顺序执行相同的操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (kv *KVServer) DoOp(req any) any {
kv.mu.Lock()
defer kv.mu.Unlock()
switch args := req.(type) {
case rpc.PutArgs:
// 去重检查
if lastSeq, exists := kv.lastApplied[args.ClientId]; exists {
if args.SeqNum <= lastSeq {
if args.SeqNum == lastSeq {
// 精确重复,返回缓存结果
return kv.lastResults[args.ClientId]
}
// 旧请求,返回错误避免重复执行
return rpc.PutReply{Err: rpc.ErrVersion}
}
}
// 版本检查
if kv.versions[args.Key] != args.Version {
reply := rpc.PutReply{Err: rpc.ErrVersion}
if args.SeqNum > lastSeq {
kv.lastApplied[args.ClientId] = args.SeqNum
kv.lastResults[args.ClientId] = reply
}
return reply
}
// 执行更新
kv.data[args.Key] = args.Value
kv.versions[args.Key]++
reply := rpc.PutReply{Err: rpc.OK}
if args.SeqNum > lastSeq {
kv.lastApplied[args.ClientId] = args.SeqNum
kv.lastResults[args.ClientId] = reply
}
return reply
}
}
关键点:
- 所有修改都在锁保护下进行,确保原子性
- 去重检查必须在执行操作之前完成
- 只有当
seqNum > lastSeq时才更新缓存,避免缓存被旧请求污染
RPC 处理:连接客户端和状态机
RPC 处理器作为桥梁,将客户端请求提交给 Raft:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {
if kv.killed() {
reply.Err = rpc.ErrWrongLeader
return
}
err, value := kv.rsm.Submit(*args)
if err == rpc.ErrWrongLeader {
reply.Err = rpc.ErrWrongLeader
return
}
if result, ok := value.(rpc.PutReply); ok {
*reply = result
}
}
rsm.Submit 会将请求提交给 Raft,等待其被复制和应用,然后返回 DoOp 的执行结果。
客户端重试逻辑
客户端需要处理各种错误情况并正确重试:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {
ck.seqNum++
args := rpc.PutArgs{
Key: key,
Value: value,
Version: version,
ClientId: ck.clientId,
SeqNum: ck.seqNum,
}
serverIndex := ck.lastLeader
isFirstAttempt := true
for {
var reply rpc.PutReply
ok := ck.clnt.Call(ck.servers[serverIndex], "KVServer.Put", &args, &reply)
if ok {
switch reply.Err {
case rpc.OK:
ck.lastLeader = serverIndex
return rpc.OK
case rpc.ErrVersion:
if isFirstAttempt {
return rpc.ErrVersion
} else {
return rpc.ErrMaybe
}
case rpc.ErrWrongLeader:
isFirstAttempt = false
serverIndex = (serverIndex + 1) % len(ck.servers)
time.Sleep(10 * time.Millisecond)
}
} else {
isFirstAttempt = false
serverIndex = (serverIndex + 1) % len(ck.servers)
time.Sleep(10 * time.Millisecond)
}
}
}
关键的语义:
- 第一次尝试收到 ErrVersion:确定操作未执行,返回 ErrVersion
- 重试后收到 ErrVersion:无法确定操作是否执行(可能第一次成功了但响应丢失),返回 ErrMaybe
Bug 查找与解决过程
问题发现
运行完整测试套件时,TestConcurrent4B 测试出现间歇性失败:
1
Fatal: Reliable: Wrong number of puts: server 127 clnts &{155 4}
这表示服务器的版本号是 127,但客户端报告了 155 次成功的 Put 操作和 4 次 Maybe 结果。两者相差 28 次,这意味着有些 Put 被客户端计数为成功,但实际上并未在服务器上执行。
调试策略
首先启用详细日志,记录每次 DoOp 的执行情况:
1
2
DPrintf("[Server %d DoOp] Put: clientId=%d seqNum=%d key=%s value=%s version=%d",
kv.me, clientId, seqNum, args.Key, args.Value, args.Version)
多次运行测试,捕获失败的日志文件进行分析。
第一个发现:旧请求的存在
在日志中发现了大量”OLD REQUEST”的警告:
1
2
[Server 3 DoOp] Put: OLD REQUEST detected! seqNum=49 < lastSeq=50
[Server 4 DoOp] Put: OLD REQUEST detected! seqNum=49 < lastSeq=50
这说明确实存在 seqNum < lastSeq 的情况。这是因为 Raft 会将所有提交的日志条目复制到所有服务器,即使某个请求在客户端看来已经完成,它仍会在所有服务器上被应用。
最初的代码有一个严重缺陷:
1
2
3
// 错误的实现
kv.lastApplied[clientId] = seqNum // 无条件更新
kv.lastResults[clientId] = reply
如果先处理 seqNum=19,再处理延迟到达的 seqNum=17,会导致 lastApplied 从 19 退化到 17,破坏了去重机制。
修复方法是只在 seqNum 更大时才更新缓存:
1
2
3
4
if lastSeq, exists := kv.lastApplied[clientId]; !exists || seqNum > lastSeq {
kv.lastApplied[clientId] = seqNum
kv.lastResults[clientId] = reply
}
第二个发现:客户端 ID 冲突
修复了缓存更新问题后,测试仍然失败。继续分析日志,发现了关键线索:
1
2
[Server 0 DoOp] Put: clientId=1763449207747363000 seqNum=1 value={"Id":0,...}
[Server 0 DoOp] Put: clientId=1763449207747363000 seqNum=1 value={"Id":1,...}
相同的 clientId 和 seqNum,但是 value 中的 Id 不同。这说明有两个不同的客户端使用了相同的 clientId。
追查客户端 ID 的生成代码:
1
2
// 问题代码
clientId: time.Now().UnixNano()
当多个 Clerk 在同一纳秒内并发创建时,它们会获得相同的时间戳,导致 clientId 冲突。这是问题的根本原因。
最终解决方案
使用加密随机数生成器来确保 clientId 的唯一性:
1
2
3
4
5
6
7
8
9
10
11
12
13
func nrand() int64 {
var buf [8]byte
rand.Read(buf[:])
return int64(binary.LittleEndian.Uint64(buf[:]))
}
func MakeClerk(...) kvtest.IKVClerk {
ck := &Clerk{
clientId: nrand(),
// ...
}
return ck
}
crypto/rand 提供的随机数质量足够高,即使并发创建成千上万个客户端也不会产生冲突。
经验总结
分布式系统中的 ID 生成
在分布式系统中,生成全局唯一 ID 是一个常见需求。几种常见方案:
- 时间戳:简单但在高并发下会冲突
- 时间戳 + 计数器:需要额外的并发控制
- UUID:字符串形式占用空间大
- 加密随机数:本实验采用的方案,简单可靠
去重机制的复杂性
实现去重比预想的复杂:
- 需要考虑请乱序到达的情况
- 缓存更新需要谨慎,避免被旧数据污染
- 理想情况下应该为每个 seqNum 都缓存结果,但这需要考虑内存管理
本实验采用了只缓存最后一个结果的简化方案,配合”拒绝旧请求”的策略来确保正确性。
调试分布式系统的方法
- 结构化日志:记录关键的状态转换和决策点
- 可重现性:通过种子控制随机性,便于复现问题
- 日志分析:使用 grep、awk 等工具分析大量日志
- 分离测试:先跑单个测试,发现问题后再跑完整套件
Raft 上构建服务的关键点
- 线性化读:即使是读操作也需要通过 Raft 达成共识(本实验的要求)
- 精确一次语义:通过客户端 ID + 序列号实现去重
- 错误处理:区分”肯定失败”和”可能成功”(ErrVersion vs ErrMaybe)
- 状态一致性:DoOp 在所有服务器上执行相同的操作序列