自学内容网 自学内容网

6.824/6.5840 Lab 5: Sharded Key/Value Service不完全指南

雄关漫道真如铁,

而今迈步从头越。

——《 忆秦娥⋅ 娄山关 》

完整代码见: https://github.com/SnowLegend-star/6.824

Part A: The Controller and Static Sharding (easy)

Part A有段话需要重点理解

Join RPC 由管理员用于添加新的副本组。它的参数是从唯一的、非零的副本组标识符(GID)到服务器名称列表的映射(a set of mappings)。分片控制器应通过创建一个包含新副本组的新配置来响应新配置应尽可能均匀地将分片分配到整个副本组集中,并且应移动尽可能少的分片以达到这个目标。如果 GID 不是当前配置的一部分,则 shardctrler 应允许重复使用该 GID(即应允许 GID 加入,然后离开,然后再次加入)。

同时,我这里重点梳理下shardCtrler在server端的几个重要概念:

  1. Config:每当server端处理一次operation(Join、Leave、Move)时,它的Config都会发生改变,Num线性递增。同时对于新的Config,要用深拷贝的方式从旧Config内复制Shards与Groups这两个元素。原因是直接用“=”对slice与map复制时,属于引用复制,复制后的新变量与被复制变量本质指向同一个容器。
  2. Replica group:common.go中提到“A Config (configuration) describes a set of replica groups, and the replica group responsible for each shard”,这里shardCtrler的结构其实和Spanner是类似的。多个replica group共同组成了shardCtrler,每个replica group内部又包含几个服务器。replica group内部的server通过底层raft协议进行通信,而replica group之间又通过rpc调用进行shard的分配迁移
  3. Shard:每个shard由一个replica group进行处理,一个replica group可以处理多个不同的shard。Part A我们要做的就是将shard分配给所有的replica group,而不需要考虑shard包含的数据应当如何传输迁移(Part B的内容)。

下面这张图就很很形象地说明shardCtrler的具体架构。 

 

了解shardCtrler的构造之后,其实part A就通透了,剩下的问题就是如何处理负载均衡这个问题——即如何通过移动最少的shard来实现Config的变更

想要解决这个问题,可以从最简单的情况着手,往shardCtrler中添加一个group后,shard该如何分配?再添加第二个、第三个呢?一个关键的思路是每个group获得的shards数量属于[average, average+1),多的也不再赘述。Talk is cheap, show you my code.

func (sc *ShardCtrler) rebalanceShards(newConfig *Config) {
groupNum := len(newConfig.Groups)      //这个集群里面有多少组 0 2 3直接算2组
hasAssignedExtra := make(map[int]bool) //记录每个gid是否已经被分配了extraShard
//组数重新归0的特殊情况
if groupNum == 0 {
return
}

//新的配置中,每个Server可以有拥有的shards的数量是:[average, average+1)
averageShards := NShards / groupNum
extraShards := NShards % groupNum
shardPerGroup := make(map[int]int)

//统计原来的Shards对应的gid的情况
for _, gid := range newConfig.Shards {
if gid != 0 {
shardPerGroup[gid]++
}
}

//获得升序的gid数组
groupList := make([]int, 0, groupNum)
for gid := range newConfig.Groups {
groupList = append(groupList, gid)
}
sort.Ints(groupList)

//遍历Shards设置可以被分配的shard   这段代码十分精妙
availableShards := []int{}
for shardId, gid := range newConfig.Shards {
if !hasAssignedExtra[gid] {
if gid == 0 || shardPerGroup[gid] > averageShards+boolToInt(extraShards > 0) {
availableShards = append(availableShards, shardId)
shardPerGroup[gid]--
newConfig.Shards[shardId] = 0
} else if shardPerGroup[gid] == averageShards+boolToInt(extraShards > 0) {
extraShards--
hasAssignedExtra[gid] = true
}
}

}

//将未分配的shard进行分配  gid0不参与分配
for _, gid := range groupList {
if gid == 0 {
continue
}
// 计算该 group 应分配的 shard 数量
expectedShards := averageShards
if extraShards > 0 {
expectedShards++
extraShards--
}

for shardPerGroup[gid] < expectedShards && len(availableShards) > 0 {
shard := availableShards[len(availableShards)-1]
availableShards = availableShards[:len(availableShards)-1]
newConfig.Shards[shard] = gid
shardPerGroup[gid]++
}
}


}

Part B: Shard Movement (hard)

整个Part B可以分为两个大块。第一个大块是对来自Clerk的command进行处理;第二个大块则是实现分片迁移。先分析处理command的工作流程,就拿Put(key, value)举例:

1、Clerk计算key所在的shardId,根据Config得到这篇shardId所在的gid。

2、遍历该group的所有servers,与其中的Leader进行通信

3、Leader进行去重判断和Config失效性判断后,将Put传入raft进行一致性同步

4、接收同步后的Put并进行处理,最后判断一次Config的时效性即可向Clerk返回。

值得一提的是如何实现每个server的kvstorage呢?

Q:同一个group内部的servers是如何同步kvstorage的呢?明确kvstorage和shardData的关系。

A:在同一个group内,Leader通过将来自Clerk的command发送至raft进行一致性同步,接下来所有的Server都可以从raft peer中收到这个command,再将这个command应用至自己的状态机。Nshards个shardData组成完整的kvstorage。

其他实现过程与part A类似,重点是处理分片迁移

先看实验说明里面的几句重点内容:

1、在配置更改期间实现分片迁移。确保副本组中的所有服务器在相同的操作序列点上执行迁移,以便它们要么同时接受客户端请求,要么同时拒绝客户端请求

可以在group进行分片迁移的时候,进行阻塞。

2、服务器需要在配置更改期间相互发送RPC以传输分片

看完实验说明,我们首先就要明确一点:无论是command还是分片迁移,都应该由group内的Leader进行处理,再与Follower进行一致性同步。这次给出的hints涉及内容跨度过大,就不率先分析hints了。

注意shardKV和shardCtrler的通信问题。

  1. Leader 从 shardCtrler 拉取配置:副本组的 Leader 定期从 shardCtrler 获取最新配置(约每100ms),检查当前配置是否与上次记录的配置不同。
  2. 分片迁移协调:一旦 Leader 检测到新的配置要求迁移某些分片,它会负责协调迁移工作。迁移数据的 RPC 请求由 Leader 发起,与其他组的 Leader 或组内的服务器通信。
  3. 同步迁移信息:为了确保一致性,Leader 将迁移指令或数据变更作为日志条目提交到其 Raft 共识模块中,然后由其他服务器应用此更改,确保整个组对分片的当前状态一致。

在实现分片迁移的时候,先搭好一个宏观的通信框架是很重要的。由于三个group不是同一时间查询Config,所以如果在shardCtrler在短时间内进行连续的配置变化时,三个group在同一个时间段查询得到的ConfigNum也不尽相同。例如在处理完Config2后,group 100查询到了Config3,group101查询到了Config3,而group102查询的是Config4。这时摆在我们面前的有两条路。

①一旦三组group的Config不同,则Config更旧的group重新查询最新的Config。但这样会忽略对旧Config的处理。在刚才的例子中,group100和group101发现自己的Config过时后,会跳过对Config3的处理,重新查询并处理Config4。

②三组group对每个Config都进行处理,而不是跳跃式处理Config。就像下图一般。

在三组group处理完Config2后,G100在第三次查询是Config8,而G101和G102都只查询到Config3。按照方案1,G101和G102会跳跃至Config8与G100同步,忽略中间几个Config。但是这种做法有两大弊端,首先是需要保存的中间状态明显更多。需要保存上一次成功同步处理的Config,还需要对已接收的shardData进行记录以便回退。其次,如果shardCtrler的状态处于持续的变化中,那三组group可能需要花费很长一段时间才能完全同步Config。

此时G100继续进入Config3进行处理,而不是令G101和G102迅速更新到Config8。三者处理完Config3后,继续处理Config4,以此类推。尽管这种方式可能要稍慢于前者,但是胜在需要保存的中间状态更少,处理过程更加的稳定。

我一开始就是选择的方案1,后面发现需要考虑的情况实在是太多了,不胜其烦,稍不留神就会出bug。

由于这次遇到的bug实在是太多,就挑连个我认为最具有代表性的来分享吧。

HandleShardData获取锁和applier获取锁直接冲突了,且是嵌套依赖。直接导致了死锁问题。

在group发送自己的shard时,我在这里对整个函数进行了加锁操作。但是,将operation提交至raft后,applier()接受来自raft的msg也需要获得锁。就会导致Handler()中的

index, _, _ := kv.rf.Start(operation)
result := kv.waitForResult(index)
if result.Err != OK {
kv.DebugLeader(dConfig, "Server %v同步validNum %v失败: %v", kv.me, kv.validConfigNum, result.Err)
}

 result永远的都无法得到结果,从而产生死锁。

总的来说这次的Lab确实是难度极大,容易遇到各种各样的问题。可恶的是这个问题就自己遇到了,不具有一般性,那就只能对着log慢慢看了,让我也是几经道心受损。最后,能用方案②就用方案②,方案①实现起来苦难重重,我用方案①写到了unreliable那个测试发现难以进行下去,最后也是重构代码采用方案②。


原文地址:https://blog.csdn.net/John_Snowww/article/details/144334072

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!