首页 > 其他 > 详细

MIT-6.824 lab1

时间:2019-02-18 13:34:58      阅读:365      评论:0      收藏:0      [点我收藏+]

github:https://github.com/haoweiz/MIT-6.824

Part1:

  第一部分比较简单,我们只需要修改doMap和doReduce函数即可,主要涉及Go语言对Json文件的读写。简单说说part1的测试流程吧,Sequential部分代码如下

 1 func TestSequentialSingle(t *testing.T) {
 2     mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
 3     mr.Wait()
 4     check(t, mr.files)
 5     checkWorker(t, mr.stats)
 6     cleanup(mr)
 7 }
 8 
 9 func TestSequentialMany(t *testing.T) {
10     mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc)
11     mr.Wait()
12     check(t, mr.files)
13     checkWorker(t, mr.stats)
14     cleanup(mr)

  makeInputs(M int)对于0-100000个数字平均分成了M个文件写入,,根据题目要求,我们需要将每个文件分成N个文件写出,因此doMap过程总共产生M*N个文件,我们可以先将文件的所有键值对通过mapF函数(本质上是test_test.go中的MapFunc函数)存储在数组keyvalue中,然后调用getStartEnd函数活动第Number个reduce文件应存储的keyvalue的切片范围,利用Encoder写入文件即可

 1 // Get the start and end index of Number if we divide Total keyvalue into nReduce parts
 2 func getStartEnd(Number, nReduce, Total int) (start, end int) {
 3     part := Total/nReduce
 4     if Number == nReduce-1 {
 5         start = Number*part
 6         end = Total
 7     } else {
 8         start = Number*part
 9         end = (Number+1)*part
10     }
11     return
12 }
13 
14 // doMap manages one map task: it reads one of the input files
15 // (inFile), calls the user-defined map function (mapF) for that file‘s
16 // contents, and partitions the output into nReduce intermediate files.
17 func doMap(
18     jobName string, // the name of the MapReduce job
19     mapTaskNumber int, // which map task this is
20     inFile string,
21     nReduce int, // the number of reduce task that will be run ("R" in the paper)
22     mapF func(file string, contents string) []KeyValue,
23 ) {
24     //
25     // You will need to write this function.
26     //
27     // The intermediate output of a map task is stored as multiple
28     // files, one per destination reduce task. The file name includes
29     // both the map task number and the reduce task number. Use the
30     // filename generated by reduceName(jobName, mapTaskNumber, r) as
31     // the intermediate file for reduce task r. Call ihash() (see below)
32     // on each key, mod nReduce, to pick r for a key/value pair.
33     //
34     // mapF() is the map function provided by the application. The first
35     // argument should be the input file name, though the map function
36     // typically ignores it. The second argument should be the entire
37     // input file contents. mapF() returns a slice containing the
38     // key/value pairs for reduce; see common.go for the definition of
39     // KeyValue.
40     //
41     // Look at Go‘s ioutil and os packages for functions to read
42     // and write files.
43     //
44     // Coming up with a scheme for how to format the key/value pairs on
45     // disk can be tricky, especially when taking into account that both
46     // keys and values could contain newlines, quotes, and any other
47     // character you can think of.
48     //
49     // One format often used for serializing data to a byte stream that the
50     // other end can correctly reconstruct is JSON. You are not required to
51     // use JSON, but as the output of the reduce tasks *must* be JSON,
52     // familiarizing yourself with it here may prove useful. You can write
53     // out a data structure as a JSON string to a file using the commented
54     // code below. The corresponding decoding functions can be found in
55     // common_reduce.go.
56     //
57     //   enc := json.NewEncoder(file)
58     //   for _, kv := ... {
59     //     err := enc.Encode(&kv)
60     //
61     // Remember to close the file after you have written all the values!
62     //
63     
64     // Read from inFile and save all keys and values in keyvalue 
65     var keyvalue []KeyValue
66     fi, err := os.Open(inFile)
67     if err != nil {
68         log.Fatal("doMap Open: ", err)
69     }
70     defer fi.Close()
71     br := bufio.NewReader(fi)
72     for {
73         a, _, c := br.ReadLine()
74         if c == io.EOF {
75             break
76         }
77         kv := mapF(inFile, string(a))
78         for i := 0; i != len(kv); i++ {
79             keyvalue = append(keyvalue, kv[i])
80         }
81     }
82 
83     // Divide keyvalue into nReduce parts and save them in nReduce files
84     var names []string
85     for r := 0; r != nReduce; r++ {
86         names = append(names, fmt.Sprintf("mrtmp.test-%d-%d", mapTaskNumber, r))
87         file, err := os.Create(names[r])
88         if err != nil {
89             log.Fatal("doMap Create: ", err)
90         }
91         start, end := getStartEnd(r, nReduce, len(keyvalue))
92         enc := json.NewEncoder(file)
93         for _, kv := range keyvalue[start:end] {
94             enc.Encode(kv)
95         }
96         file.Close()
97     }
98 }

  对于doReduce函数我们需要读取nMap个文件,将所有键值对解码并重新编码写出到outFile中

 1 // doReduce manages one reduce task: it reads the intermediate
 2 // key/value pairs (produced by the map phase) for this task, sorts the
 3 // intermediate key/value pairs by key, calls the user-defined reduce function
 4 // (reduceF) for each key, and writes the output to disk.
 5 func doReduce(
 6     jobName string, // the name of the whole MapReduce job
 7     reduceTaskNumber int, // which reduce task this is
 8     outFile string, // write the output here
 9     nMap int, // the number of map tasks that were run ("M" in the paper)
10     reduceF func(key string, values []string) string,
11 ) {
12     //
13     // You will need to write this function.
14     //
15     // You‘ll need to read one intermediate file from each map task;
16     // reduceName(jobName, m, reduceTaskNumber) yields the file
17     // name from map task m.
18     //
19     // Your doMap() encoded the key/value pairs in the intermediate
20     // files, so you will need to decode them. If you used JSON, you can
21     // read and decode by creating a decoder and repeatedly calling
22     // .Decode(&kv) on it until it returns an error.
23     //
24     // You may find the first example in the golang sort package
25     // documentation useful.
26     //
27     // reduceF() is the application‘s reduce function. You should
28     // call it once per distinct key, with a slice of all the values
29     // for that key. reduceF() returns the reduced value for that key.
30     //
31     // You should write the reduce output as JSON encoded KeyValue
32     // objects to the file named outFile. We require you to use JSON
33     // because that is what the merger than combines the output
34     // from all the reduce tasks expects. There is nothing special about
35     // JSON -- it is just the marshalling format we chose to use. Your
36     // output code will look something like this:
37     //
38     // enc := json.NewEncoder(file)
39     // for key := ... {
40     //     enc.Encode(KeyValue{key, reduceF(...)})
41     // }
42     // file.Close()
43     //
44 
45     // Read all mrtmp.xxx-m-reduceTaskNumber and write to outFile
46     var names []string
47     file, err := os.Create(outFile)
48     if err != nil {
49         log.Fatal("doReduce Create: ", err)
50     }
51     enc := json.NewEncoder(file)
52     defer file.Close()
53 
54     // Read all contents from mrtmp.xxx-m-reduceTaskNumber
55     for m := 0; m != nMap; m++ {
56         names = append(names,  fmt.Sprintf("mrtmp.test-%d-%d", m, reduceTaskNumber))
57         fi, err := os.Open(names[m])
58         if err != nil {
59             log.Fatal("doReduce Open: ", err)
60         }
61         dec := json.NewDecoder(fi)
62         for {
63             var kv KeyValue
64             err = dec.Decode(&kv)
65             if err != nil {
66                 break
67             }
68             enc.Encode(kv)
69         }
70         fi.Close()
71     }
72 }

  通过测试

 技术分享图片

MIT-6.824 lab1

原文:https://www.cnblogs.com/haoweizh/p/10395016.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!