Tag Archives | shuffle

【hadoop代码笔记】Mapreduce shuffle过程之Reduce获取mapper输出

一、概要描述

上篇博文从代码分析来说明在map端是如何将map的输出保存下来等待reduce来取,在本篇文章中将重点分析reduce端的代码,即Reduce端如何在ReduceTask执行之前把其需要的输出拷贝过来,然后作为reduce的输入进行执行。本篇文章中的大部分内容在?hadoop作业提交之Child启动reduce任务一文中有过描述。本篇只是着重介绍Reduce获取输入部分内容。即reduce task在执行之前的不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。

Reduce需要集群上若干个map任务的输出作为其特殊的分区文件。Map 任务完成后,会通知其父tasktracker状态更新,然后tasktracker通知jobtracker。通过心跳机制来完成。因此jobtracker知道map输出和tasktracker之间的映射关系。Reducer的一个getMapCompletionEvents线程定期询问jobtracker以便获取map输出位置。每个map任务完成的时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。其实是启动若干个MapOutputCopier线程来复制完所有map输出。在复制完成后reduce任务进入排序阶段。这个阶段将由LocalFSMerger或InMemFSMergeThread合并map输出,维持其顺序排序。在reduce阶段,对已排序输出的每个键都要调用reduce函数,此阶段的输出直接写到文件系统,一般为HDFS上。(如果采用HDFS,由于tasktracker节点也是DataNoe,所以第一个块副本将被写到本地磁盘。 即数据本地化)

二、 流程描述

1.在ReduceTak中 构建ReduceCopier对象,调用其fetchOutputs方法。
2. 在ReduceCopier的fetchOutputs方法中分别构造几个独立的线程。相互配合,并分别独立的完成任务。
2.1 GetMapEventsThread线程通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。
2.2构造并启动若干个MapOutputCopier线程,通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。
2.3LocalFSMerger对磁盘上的map 输出进行归并。
2.4nMemFSMergeThread对内存中的map输出进行归并。
3.根据拷贝到的map输出构造一个raw keyvalue的迭代器,作为reduce的输入。
4. 调用runNewReducer方法中根据配置的Reducer类构造一个Reducer实例和运行的上下文。并调用reducer的run方法来执行到用户定义的reduce操作。
5.在Reducer的run方法中从上下文中取出一个key和该key对应的Value集合(Iterable<VALUEIN>类型),调用reducer的reduce方法进行处理。
6. Recuer的reduce方法是用户定义的处理数据的方法,也是用户唯一需要定义的方法。
[......]

阅读全文

Tags: , , ,

Comments { 0 }