【hadoop代码笔记】hadoop作业提交之TaskTracker获取Task

一、概要描述

上上一篇博文上一篇博文中 分别描述了jobTracker和其服务(功能)模块初始化完成后,接收JobClient提交的作业,并进行初始化。本文着重描 述,JobTracker如何选择作业的Task分发到TaskTracker。本文只是描述一个TaskTracker如何从JobTracker获取 Task任务。Task任务在TaskTracker如何执行将在后面博文中描述。

二、 流程描述

1. TaskTracker在run中调用offerService()方法一直死循环的去连接Jobtracker,先Jobtracker发送心跳,发送自身状态,并从Jobtracker获取任务指令来执行。
2. 在JobTracker的heartbeat方法中,对于来自每一个TaskTracker的心跳请求,根据一定的作业调度策略调用assignTasks方法选择一定Task
3.Scheduler调用对应的LoadManager的canAssignMap方法和canAssignReduce方法以决定是否可以给 tasktracker分配任务。默认的是CapBasedLoad,全局平均分配。即根据全局的任务槽数,全局的map任务数的比值得到一个load系 数,该系数乘以待分配任务的tasktracker的最大map任务数,即是该tasktracker能分配得到的任务数。如果太tracker当前运行 的任务数小于可运行的任务数,则任务可以分配新作业给他。(图中缺失了LoadManager的表达,也画不下了,就不加了。在代码详细分析中有)
3. Scheduler的调用TaskSelector的obtainNewMapTask或者obtainNewReduceTask选择Task。
4. 在DefaultTaskSelector中选择Task的方法其实只是封装了JobInProgress的对应方法。
5. JobTracker根据得到的Task构造TaskTrackerAction设置到到HeartbeatResponse返回给TaskTracker。
6. TaskTracker中将来自JobTracker的任务加入到TaskQueue中等待执行。

hadoop_mapreduce_tasktracker_retrieve_task

TaskTracker获取Task

 

三、代码详细

1. TaskTracker的入口函数main

2. TaskTracker的构造函数

3. TaskTracker的initialize方法,完成TaskTracker的初始化工作。

主要流程

1)检查可以创建本地文件夹
2)清理或者初始化需要用到的实例集合变量
3)初始化RPC服务器,接受task的请求。
4)清除临时文件
5)jobtracker的代理,负责处理和jobtracker的交互,通过RPC方式。
6)一个线程,获取map完成事件。
7)初始化内存管理
8)分别启动map和reduce的tasklauncher

4. TaskTracker run方法,在其中一直尝试执行offerService方法

5. TaskTracker 的offerService方法

1) 通过RPC调用获得Jobtracker的系统目录。
2) 发送心跳并且获取Jobtracker的应答
3) 从JobTrackeer的应答中获取指令
4) 不同的指令类型执行不同的动作
5) 对于要launch的task加入到taskQueue中去
6) 对于清理动作,加入待清理的task集合,会有线程自动清理
7) 杀死那些过久未反馈进度的task
8) 当磁盘空间不够时,杀死某些task以腾出空间

6. TaskTracker的 transmitHeartBeat方法,定时向JobTracker发心跳。其实是通过RPC的方式向调用Jobtracker的heartbeat方法。

6. JobTracker的 heartbeat方法。Jobtracker 接受并处理 tasktracker上报的状态,在返回的应答信息中指示tasktracker完成启停job或启动某个task的动作。

动作类型类 描述
CommitTaskAction 指示Task保存输出,即提交
KillJobAction 杀死属于这个Job的任何一个Task
KillTaskAction 杀死指定的Task
LaunchTaskAction 开启某个task
ReinitTrackerAction 重新初始化taskTracker

主要流程如下:
1) acceptTaskTracker(status)方法通过查询inHostsList(status) && !inExcludedHostsList确认Tasktracker是否在JobTracker的允许列表中。
2) 当得知TaskTracker重启的标记,从jobtracker的潜在故障名单中移除该tasktracker
3) 如果initialContact为否表示这次心跳请求不是该taskTracker第一次连接jobtracker,但是如果在jobtracker的 trackerToHeartbeatResponseMap记录中没有之前的响应记录,则说明发生了笔记严重的错误。发送指令给tasktracker 要求其重新初始化。
4) 如果这是有问题的tasktracker重新接回来的第一个心跳,则通知recoveryManager recoveryManager从的recoveredTrackers列表中移除该tracker以表示该tracker又正常的接回来了。
5) 如果initialContact != true 并且 revHeartbeatResponse != null表示上一个心跳应答存在,但是tasktracker表示第一次请求,则说上一个initialContact请求的应答丢失了,未传送到 tasktracker。则只是简单的把原来的应答重发一下即可。
6) 构造应答的Id,是递加的。
7) 处理心跳,其实就是在jobTracker端更新该tasktracker的状态
8) 检查tasktracker可以运行新的task
9) 调用JobTracker配置的taskSceduler来调度task给对应的TaskTracker。从submit到JobTracker的Job列表中选择每个job的每个Task,适合交给该TaskTracker调度的Task
10) 把分配的Task加入到expireLaunchingTasks,监视并处理其是否超时。
11) 根据调度器发获得要启动的task构造LaunchTaskAction,通知taskTracker启动这些task。
12) 把属于该tasktracker的,job已经结束的task加入到killTasksList,发送到tasktracker杀死。即结束那些在tasktracker上已经结束了的作业的task,不管作业是完成还失败。
13) 判定哪些作业需要清理的,构造Action加入到action列表中。trackerToJobsToCleanup是一个结合,当job gc的时候,调用 finalizeJob进而调用 addJobForCleanup 把作业加入到trackerToJobsToCleanup中
14) 判定那些task可以提交输出,构造action加入到action列表。
15) 计算下一次心跳的间隔,设置到应答消息中。
16) 把上面这些Action设置到response中返回。
17) 把本次应答保存到trackerToHeartbeatResponseMap中

7.FairScheduler的assignTasks方法。JobTracker就是调用该方法来实现作业的分配的。

主要流程如下:

1)分别计算可运行的maptask和reducetask总数
2)ClusterStatus 维护了当前Map/Reduce作业框架的总体状况。根据ClusterStatus计算得到获得map task的槽数,reduce task的槽数。
3)调用LoadManager方法决定是否可以为该tasktracker分配任务(默认CapBasedLoadManager方法根据全局的任务槽数, 全局的map任务数的比值得到一个load系数,该系数乘以待分配任务的tasktracker的最大map任务数,即是该tasktracker能分配 得到的任务数。如果太tracker当前运行的任务数小于可运行的任务数,则任务可以分配新作业给他)
4)从job列表中找出那些job需要运行map或reduce任务,加到List<JobInProgress> candidates集合中
5)对candidates集合中的job排序,对每个job调用taskSelector的obtainNewMapTask或者 obtainNewReduceTask方法获取要执行的task。把所以的task放到task集合中返回。从而实现了作业Job的任务Task分配。
6)并对candidates集合中的每个job,更新Jobinfo信息,即其正在运行的task数,需要运行的task数,以便其后续调度用。

 

8.CapBasedLoadManager的canAssignMap方法和canAssignReduce方法。一 种简单的算法在FairScheduler中用来决定是否可以给某个tasktracker分配maptask或者reducetask。总体思路是对于 某种类型的task,map或者reduce,考虑jobtracker管理的mapreduce集群全部的任务数,和全部的任务槽数,和该 tasktracker上面当前的任务数,以决定是否给他分配任务。如对于maptask,根据全局的任务槽数,全局的map任务数的比值得到一个 load系数,该系数乘以待分配任务的tasktracker的最大map任务数,即是该tasktracker能分配得到的任务数。如果太 tracker当前运行的任务数小于可运行的任务数,则任务可以分配新作业给他。reducetask同理。即尽量做到全局平均。

9.DefaultTaskSelector继承自TaskSelector,其两个方法其实只是对jobInprogress得封装,没有做什么特别的事情。

10. JobInProgress的obtainNewMapTask方法。其实主要逻辑是在findNewMapTask方法中实现。

11 JobInProgress的findNewMapTask方法。
根据待派发Task的TaskTracker根据集群中的TaskTracker数量(clusterSize),运行TraskTracker的服务器数(numUniqueHosts),该Job中map task的平均进度(avgProgress),可以调度map的最大水平(距离其实),选择一个task执行。考虑到map的本地化。

12 JobInProgress的obtainNewReduceTask方法返回一个ReduceTask,实际调用的是findNewReduceTask方法。

13 JobInProgress的findNewReduceTask方法,为指定的TaskTracker选择Reduce task。不用考虑本地化。

14 TaskTracker 的addToTaskQueue方法。对于要launch的task加入到taskQueue中去,不同类型的Task有不同类型额launcher。

完。

原创文章。为了维护文章的版本一致、最新、可追溯,转载请注明: 转载自idouba

本文链接地址: 【hadoop代码笔记】hadoop作业提交之TaskTracker获取Task


, , ,

One Response to 【hadoop代码笔记】hadoop作业提交之TaskTracker获取Task

  1. gwgyk 2014年11月2日 at 下午2:20 #

    大神你好,请问在JobInProgress的findNewMapTask方法中,第50行到79行的这段代码,为什么会有“key = key.getParent();”这行语句?为什么还要遍历父节点中的TaskInProgress?DataNode的父节点不就是rack吗?难道rack上也会运行task?

    如果您有空麻烦能解决下我这个问题,非常感谢,我的邮箱:664548870@qq.com

发表评论