【hadoop代码笔记】Mapreduce shuffle过程之Map输出过程

一、概要描述

shuffle是MapReduce的一个核心过程,因此没有在前面的MapReduce作业提交的过程中描述,而是单独拿出来比较详细的描述。 根据官方的流程图示如下:

mapreduce shuffle

? 本篇文章中只是想尝试从代码分析来说明在map端是如何将map的输出保存下来等待reduce来取。 在执行每个map task时,无论map方法中执行什么逻辑,最终都是要把输出写到磁盘上。如果没有reduce阶段,则直接输出到hdfs上,如果有有reduce作业,则每个map方法的输出在写磁盘前线在内存中缓存。每个map task都有一个环状的内存缓冲区,存储着map的输出结果,默认100m,在每次当缓冲区快满的时候由一个独立的线程将缓冲区的数据以一个溢出文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有溢出文件做合并,被合并成已分区且已排序的输出文件。然后等待reduce task来拉数据。

二、 流程描述

  1. ?在child进程调用到runNewMapper时,会设置output为NewOutputCollector,来负责map的输出。
  2. ?在map方法的最后,不管经过什么逻辑的map处理,最终一般都要调用到TaskInputOutputContext的write方法,进而调用到设置的output即NewOutputCollector的write方法
  3. NewOutputCollector其实只是对MapOutputBuffer的一个封装,其write方法调用的是MapOutputBuffer的collect方法。
  4. MapOutputBuffer的collect方法中把key和value序列化后存储在一个环形缓存中,如果缓存满了则会调用startspill方法设置信号量,使得一个独立的线程SpillThread可以对缓存中的数据进行处理。
  5. SpillThread线程的run方法中调用sortAndSpill方法对缓存中的数据进行排序后写溢出文件。
  6. 当map输出完成后,会调用output的close方法。
  7. 在close方法中调用flush方法,对剩余的缓存进行处理,最后调用mergeParts方法,将前面过程的多个溢出文件合并为一个。

Mapreduce shuffle过程之Map输出过程代码流程

Mapreduce shuffle过程之Map输出过程代码流程

三、代码详细

1 ?MapTask的runNewMapper方法

注意到有这样一段代码。即当job中只有map没有reduce的时候,这个rg.apache.hadoop.mapreduce.RecordWriter类型的对象 output是一Outputformat中定义的writer,即直接写到输出中。如果是有Reduce,则output是一个NewOutputCollector类型输出。

和其他的RecordWriter一样,NewOutputCollector也继承自RecordWriter抽象类。除了一个close方法释放资源外,该抽象类定义的最主要的方法就一个void write(K key, V value)。即写入key,value。

2. Mapper的run方法,对每个输出执行map方法。

3. ?Mapper的map方法,默认是直接把key和value写入

一般使用中会做很多我们需要的操作,如著名的wordcount中,把一行单词切分后,数一(value都设为one = new IntWritable(1)),但最终都是要把结果写入。即调用context.write(key,value)

4.?TaskInputOutputContext的write方法。调用的是contex中的RecordWriter的write方法。即调用的是NewOutputCollector的write方法。

5.?NewOutputCollector的write方法。

从方法名上不难看出提供写数据的是MapOutputCollector<K,V>类型的 collector对象.从NewOutputCollector的构造函数中看到collector的初始化。

6.?MapOutputBuffer的构造函数,在了解MapOutputBuffer的collect方法前,先了解下期构造函数,看做了哪些初始化。

7.?MapOutputBuffer的collect方法。

参数partition是partitioner根据key计算得到的当前key value属于的partition索引。写key和value写入缓存,当缓存满足spill条件时,通过调用startSpill方法设置变量并通过spillReady.signal(),通知spillThread;并等待spill结束(通过spillDone.await()等待)缓冲区的作用是批量收集map结果,减少磁盘IO的影响。key/value对以及Partition的结果都会被写入缓冲区。写入之前,key与value值都会被序列化成字节数组。kvindices保持了记录所属的分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,可以在缓冲区中找到对应的记录。

输出缓冲区中,和kvstart,kvend和kvindex对应的是bufstart,bufend和bufmark。这部分还涉及到变量bufvoid,用于表明实际使用的缓冲区结尾和变量bufmark,用于标记记录的结尾。需要bufmark,是因为key或value的输出是变长的。

Key Value序列化后缓存

Key Value序列化后缓存



8.?MapOutputBuffer.BlockingBuffer的reset()方法.

如果key串行化后出现bufindex < keystart,那么会调用BlockingBuffer的reset方法。原因是在spill的过程中需要对<key,value>排序,这种情况下,传递给RawComparator的必须是连续的二进制缓冲区,通过BlockingBuffer.reset方法当发现key的串行化结果出现不连续的情况时,会把bufvoid设置为bufmark,缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。

BlockingBuffer.reset方法

BlockingBuffer.reset方法

bufstart前面的缓冲区如果不能放下整个key串行化的结果,,处理的方式是将bufindex置0,然后调用BlockingBuffer内部的out的write方法直接输出,这实际调用了Buffer.write方法,会启动spill过程,最终会成功写入key串行化的结果。

9. ?MapOutputBuffer.Buffer的write方法。在key和value序列列化的时候,被调用写到缓存中。如果spill线程正在把缓存的数据写溢出文件,则阻塞。

 

buffull和wrap条件说明

buffull和wrap条件说明

如图,对bufful和wrap条件进行说明: 在上面两种情况下,即情况1和情况2,

buffull条件判断为从下次写指针的位置bufindex到缓存结束bufvoid的空间是否有足够的空间容纳写的内容,wrap是图中白颜色部分的空间(前后空白合在一起)是否比输入大,如果是,wrap为true; 情况3和情况4中,

buffull判断bufindex到bufstart的空间是否满足条件,而wrap肯定是false。 条件(buffull && !wrap)满足时,目前的空间不够一次写。

10.?MapOutputBuffer 的spillSingleRecord方法。如果在collect方法中处理缓存失败,则直接把这条记录些到spill文件中。对应单条记录即使设置了combiner也不用。如果记录非常大,内存缓冲区不能容下这么大的数据量,抛MapBufferTooSmallException异常,直接写文件不用写缓存。

11.MapOutputBuffer的startSpill。唤醒等待spillReady的线程。

12.?SpillThread的run方法。

该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。当kvstart == kvend条件成立时,表示没有要spill的记录。

13..MapOutputBuffer的sortAndSpill() ?方法

SpillThread线程的run方法中调用sortAndSpill把缓存中的输出写到格式为+ “/spill” + spillNumber + “.out”的spill文件中。索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中

创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。

14 MapOutputBuffer的compare方法和swap方法

MapOutputBuffer实现了IndexedSortable接口,从接口命名上就可以猜想到,这个排序不是移动数据,而是移动数据的索引。在这里要排序的其实是kvindices对象,通过移动其记录在kvoffets上的索引来实现。

如图,表示了写磁盘前Sort的效果。kvindices保持了记录所属的(Reduce)分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,我们可以在缓冲区中找到对应的记录。kvoffets用于在缓冲区满的时候对kvindices的partition进行排序,排完序的结果将输出到输出到本地磁盘上,其中索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中。通过观察MapOutputBuffer的compare知道,先是在partition上排序,然后是在key上排序。

kvindices在kvoffets上排序

kvindices在kvoffets上排序


15. MapOutputBuffer的flush() ?方法

Mapper的结果都已经collect了,需要对缓冲区做一些最后的清理,调用flush方法,合并spill{n}文件产生最后的输出。先等待可能的spill过程完成,然后判断缓冲区是否为空,如果不是,则调用sortAndSpill,做最后的spill,然后结束spill线程.

16.MapTask.MapOutputBuffer的mergeParts()方法.

从不同溢写文件中读取出来的,然后再把这些值加起来。因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果配置设置过Combiner,也会使用Combiner来合并相同的key。?mapreduce让每个map只输出一个文件,并且为这个文件提供一个索引文件,以记录每个reduce对应数据的偏移量。

合并前后index文件和spill文件的结构图

merge最终生成一个spill.out和spill.out.index文件

merge最终生成一个spill.out和spill.out.index文件

从前面的分析指导,多个partition的都在一个输出文件中,但是按照partition排序的。即把maper输出按照partition分段了。一个partition对应一个reducer,因此一个reducer只要获取一段即可。

完。

参考:

参考并补充了http://caibinbupt.iteye.com/blog/401374文章中关于内存中索引结构的分析。谢谢。

原创文章。为了维护文章的版本一致、最新、可追溯,转载请注明: 转载自idouba

本文链接地址: 【hadoop代码笔记】Mapreduce shuffle过程之Map输出过程


, , ,

2 Responses to 【hadoop代码笔记】Mapreduce shuffle过程之Map输出过程

  1. douma 2014年2月12日 at 下午12:37 #

    辛苦啦!希再努力!

  2. try 2014年2月9日 at 下午3:07 #

    谢谢。

发表评论