//// main/mrworker.go calls this function.//funcWorker(mapffunc(string,string)[]KeyValue,reduceffunc(string,[]string)string){// Your worker implementation here.// uncomment to send the Example RPC to the coordinator.// CallExample()for{args:=WorkerArgs{}reply:=WorkerReply{}ok:=call("Coordinator.AllocateTask",&args,&reply)if!ok{// the coordinator may diedlog.Fatal("the coordinator may died")return}switchreply.TaskType{caseTaskTypeMap:// doMapTaskcaseTaskTypeReduce:// doReduceTaskcaseTaskTypeWait:time.Sleep(time.Second)caseTaskTypeFinish:returndefault:panic(fmt.Sprintf("unexpected jobType %v",reply.TaskType))}}}
typeCoordinatorstruct{// Your definitions here.NReduceint// number of reduceNMapint// number of mapFiles[]stringMapFinishedint// number of finished mapMapTaskLog[]int// log of map; 0: not allocated, 1: waiting, 2: finishedReduceFinishedint// number of finished reduceReduceTaskLog[]int// log of reduce; 0: not allocated, 1: waiting, 2: finishedmusync.Mutex// lock}// Your code here -- RPC handlers for the worker to call.func(c*Coordinator)AllocateTask(args*WorkerArgs,reply*WorkerReply)error{c.mu.Lock()deferc.mu.Unlock()ifc.MapFinished<c.NMap{// map task}elseifc.ReduceFinished<c.NReduce{// reduce task}else{reply.TaskType=3}returnnil}func(c*Coordinator)ReceiveFinishedMap(args*WorkerArgs,reply*WorkerReply)error{c.mu.Lock()deferc.mu.Unlock()c.MapFinished++// finished map count ++c.MapTaskLog[args.MapTaskNumber]=2returnnil}func(c*Coordinator)ReceiveFinishedReduce(args*WorkerArgs,reply*WorkerReply)error{c.mu.Lock()deferc.mu.Unlock()c.ReduceFinished++// finished reduce count ++c.ReduceTaskLog[args.ReduceTaskNumber]=2returnnil}