【hadoop代码笔记】Mapreduce shuffle过程之Reduce获取mapper输出

一、概要描述

上篇博文从代码分析来说明在map端是如何将map的输出保存下来等待reduce来取,在本篇文章中将重点分析reduce端的代码,即Reduce端如何在ReduceTask执行之前把其需要的输出拷贝过来,然后作为reduce的输入进行执行。本篇文章中的大部分内容在?hadoop作业提交之Child启动reduce任务一文中有过描述。本篇只是着重介绍Reduce获取输入部分内容。即reduce task在执行之前的不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。

Reduce需要集群上若干个map任务的输出作为其特殊的分区文件。Map 任务完成后,会通知其父tasktracker状态更新,然后tasktracker通知jobtracker。通过心跳机制来完成。因此jobtracker知道map输出和tasktracker之间的映射关系。Reducer的一个getMapCompletionEvents线程定期询问jobtracker以便获取map输出位置。每个map任务完成的时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。其实是启动若干个MapOutputCopier线程来复制完所有map输出。在复制完成后reduce任务进入排序阶段。这个阶段将由LocalFSMerger或InMemFSMergeThread合并map输出,维持其顺序排序。在reduce阶段,对已排序输出的每个键都要调用reduce函数,此阶段的输出直接写到文件系统,一般为HDFS上。(如果采用HDFS,由于tasktracker节点也是DataNoe,所以第一个块副本将被写到本地磁盘。 即数据本地化)

二、 流程描述

1.在ReduceTak中 构建ReduceCopier对象,调用其fetchOutputs方法。
2. 在ReduceCopier的fetchOutputs方法中分别构造几个独立的线程。相互配合,并分别独立的完成任务。
2.1 GetMapEventsThread线程通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。
2.2构造并启动若干个MapOutputCopier线程,通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。
2.3LocalFSMerger对磁盘上的map 输出进行归并。
2.4nMemFSMergeThread对内存中的map输出进行归并。
3.根据拷贝到的map输出构造一个raw keyvalue的迭代器,作为reduce的输入。
4. 调用runNewReducer方法中根据配置的Reducer类构造一个Reducer实例和运行的上下文。并调用reducer的run方法来执行到用户定义的reduce操作。
5.在Reducer的run方法中从上下文中取出一个key和该key对应的Value集合(Iterable<VALUEIN>类型),调用reducer的reduce方法进行处理。
6. Recuer的reduce方法是用户定义的处理数据的方法,也是用户唯一需要定义的方法。

Child启动reduce任务

Child启动reduce任务

三、代码详细

1.Child的main方法每个task进程都会被在单独的进程中执行,这个方法就是这些进程的入口方法。Reduce和map一样都是由该main函数调用。所以此处不做描述,详细见上节Child启动map任务

2. ReduceTask的run方法。在Child子进程中被调用,执行用户定义的Reduce操作。前面代码逻辑和MapTask类似。通过TaskUmbilicalProtocol向tasktracker上报执行进度。开启线程向TaskTracker上报进度,根据task的不同动作要求执行不同的方法,如jobClean,jobsetup,taskCleanup。对于部分的了解可以产看taskTracker获取Task文章中的JobTracker的 heartbeat方法处的详细解释。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。

3. ReduceCopier类的fetchOutputs方法。该方法负责将map的输出拷贝的reduce端进程处理。从代码上看,启动了一个LocalFSMerger、InMemFSMergeThread、 GetMapEventsThread 和若干个MapOutputCopier线程。几个独立的线程。相互配合,并分别独立的完成任务。


 

4. MapOutputCopier线程的run方法。从scheduledCopies(List<MapOutputLocation>)中取出对象来调用copyOutput方法执行拷贝。通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。

5.MapOutputCopier线程的copyOutput方法。map的输出从远端map所在的tasktracker拷贝到reducer任务所在的tasktracker。

5.ReduceCopier.MapOutputCopier的getMapOutput方法,真正执行拷贝动作的方法,通过http把远端服务器上map的输出拷贝到本地。

6.ReduceTask.ReduceCopier.MapOutputCopier的shuffleInMemory方法。根据上一方法当map的输出可以在内存中存储时会调用该方法。

7.ReduceTask.ReduceCopier.MapOutputCopier的shuffleToDisk 方法把map输出拷贝到本地磁盘。当map的输出不能再内存中存储时,调用该方法。

8.LocalFSMerger线程的run方法。Merge map输出的本地拷贝。

9.InMemFSMergeThread线程的run方法。

10. InMemFSMergeThread线程的doInMemMerge方法,

11.ReduceCopier.GetMapEventsThread线程的run方法。通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。

12.ReduceCopier.GetMapEventsThread线程的getMapCompletionEvents方法。通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,构造URL,加入到mapLocations。

13.ReduceTask.ReduceCopier的createKVIterator方法,从拷贝到的map输出创建RawKeyValueIterator,作为reduce的输入。

14.ReduceTask的runNewReducer方法。根据配置构造reducer以及其运行的上下文,调用reducer的reduce方法。

 

15.抽象类Reducer的run方法。从上下文中取出一个key和该key对应的Value集合(Iterable<VALUEIN>类型),调用reducer的reduce方法进行处理。

16.Reducer类的reduce,是用户一般会覆盖来执行reduce处理逻辑的方法。

 

完。

相关文章:

原创文章。为了维护文章的版本一致、最新、可追溯,转载请注明: 转载自idouba

本文链接地址: 【hadoop代码笔记】Mapreduce shuffle过程之Reduce获取mapper输出


, , ,

No comments yet.

发表评论