这个是 MIT 6.824 分布式系统(Distributed Systems)2022 年的 lab1,lab 链接。字有点多,需要花些时间耐心读一下。
个人认为这个 Lab 在对 Golang 有一定了解的情况下不是很难实现,一定要把题意理解清楚,画一个大致框架再动手。我用了两天时间,第一天主要是读题(再次吐槽一下字有点多)和画大致框架,第二天花了一下午几个小时写了代码。Debug 没有难度,可能是我运气比较好,一把过2333
下面开始讲个人的实现,代码量不大,算上注释大致 200 行。
复述
实现 Google 曾经使用的 MapReduce 的一个简单版本。具体的,修改mr
目录下的 coordinator.go
,rpc.go
,worker.go
三个文件实现 MapReduce
思路
主要是修改 coordinator.go
和 worker.go
,分别对于 pdf 文档中的 Master 和 Worker。前者负责调度任务,比如如何分配文件,如何规定超时任务的重新安排等等;后者负责实际执行 map
和 reduce
函数。
由于实际编写的是 coordinator 和 woker 的分布式版本,我们需要模拟这两个函数实际是在不同的机器上运行的,所以 coordinator 和 worker 两者的通信方式需要使用 RPC。于是,我们需要简单定义一下两者的通信方式:
- 我们需要知道,需要先启动单个 coordinator,随后再启动多个 worker,这个测试脚本为我们做了,我们只需要了解。
- worker 通过 RPC 向 coordinator 发送请求,表示“我现在空闲,请给我一个任务”。
- 由于 coordinator 已经注册了 RPC 服务(我们无需关注),其可以收到来自 worker 的 RPC 请求,我们从 coordinator 的任务队列中取出一个任务发送给 worker,并同时启动一个用于接收 worker 完成任务的方法和一个超时计时器。前者用于当 worker 完成任务时通知 coordinator;后者用于当 worker 没有按时完成任务时(paper 中解释的,可能 worker 崩溃了,或者网络阻塞等等),用于将任务重新安排给一个 worker。
- 当 worker 通过 RPC 获取到任务时,进行实际的工作。如果任务出错,直接终止等待下个任务即可,因为 coordinator 的计时器会帮我们重新安排这个任务;如果任务成功,再通过 RPC 告知 coordinator 任务已完成。若对 coordinator 的 RPC 调用失败,我们认为 coordinator 已经传输了所有的任务已退出,worker 可以终止。
简单的流程图如下:
程序的流程是:
- 我们通过对每个原始文件的内容作为 map 函数的参数输入,得到一组
KeyValue
,对于不同的Key
通过ihash(key) % nReduce
计算出不同的key
的reduceId
将这个结果保存在mr-{taskId}-{reduceId}
这个中间文件(intermediate)中。 - 当所有原始文件全部生成完对应的
mr-{taskId}-{reduceId}
文件后,coordinator 开始分配 reduce 函数的任务,于是每个 worker 只需要查找全部对应的reduceId
的文件(具体的:mr-*-{reduceId}
)进行合并处理和reduce
调用即可。对于每个reduceId
,生成对应的mr-out-{reduceId}
文件。
简单来说,就是:pg*.txt
–> mr-1-0
, mr-1-1
, …, mr-2-0
, mr-2-1
, … –> mr-out-0
, mr-out-1
,… 这个流程。
代码修改
由于没有删除的代码,下面只列出添加的代码:
rpc.go
首先添加一些 rpc 的定义:
1 | type Task struct { |
worker.go
1 | // 添加的 import |
下面是 Worker
函数的定义,全是感情,没有任何 Go 的技巧 :)
1 | func Worker(mapf func(string, string) []KeyValue, |
coordinator.go
个人认为 coordinator.go
的实现颇具技巧,网络上很多实现有大量评论代码“很不 Go”,没有 Go 的风格。
我虽说不是 Go 的高手,但也有一段时间具体学习了一下 Go 这门语言,参加了一些开源项目,对自己的代码风格还有有些信心,在这里自卖自夸一下2333,话不多说,先是 import:
1 | import ( |
对 Coordinator
结构体的修改,添加了一些字段用于操作:
1 | type Coordinator struct { |
MakeCoordinator
函数的设计:
1 | func MakeCoordinator(files []string, nReduce int) *Coordinator { |
NeedWork
RPC 函数:
1 | func (c *Coordinator) NeedWork(args *NeedWorkArgs, reply *NeedWorkReply) error { |
FinishWork
RPC 函数:
1 | func (c *Coordinator) FinishWork(args *FinishWorkArgs, reply *FinishWorkReply) error { |
总结
通过这个实验我大致搞明白了一个简单的分布式系统由哪些部分组成。至此,分布式系统的 Lab1 完成,我认为难点不在代码的实现,而是题目的理解。更干净的代码实现见本人的 github commit history