Tag Archives | hadoop

【hadoop代码笔记】hadoop作业提交之TaskTracker 启动task

一、概要描述

上篇博文描 述了TaskTracker从Jobtracker如何从JobTracker获取到要执行的Task。在从JobTracker获取到 LaunchTaskAction后,执行addToTaskQueue方法来把要执行的Task加入到queue。在本篇博文中,我们来关注下该方法 后,TaskTracker怎么来处理这些Task。

实际上,TaskTracker初始化时,会初始化并启动两个TaskLauncher类型的线程,mapLauncher,reduceLauncher。在TaskTracker从JobTracher获取到任务后,对应的会把任务添加到两个 TaskLauncher的Queue中,其实是TaskLauncher维护的一个列表List<TaskInProgress> tasksToLaunch。
TaskLauncher线程一直会定时检查TaskTracher上面有slot开业运行新的Task,则启动 Task。在这个过程中,先把task运行需要的文件解压到本地,并创建根据Task类型(Map或者Reduce)创建一个TaskRunner线程, 在TaskRunner中JvmManager调用JvmManagerForType、JvmRunner来启动一个java进程来执行Map或Reduce任务。

本文只是介绍到启动一个java进程,至于是什么样的java进程,对于maptask和reducetask分别是怎么执行的,在后面的child启动maptask,和child启动reducetask 会比较详细的介绍。

二、 流程描述

1.tasktracker的offerService方法获取到要执行的task后调用addToTaskQueue方法,其实是调用taskrunner的addToTaskQueue方法
2.TaskLauncher内部维护了一个List<TaskInProgress> tasksToLaunch,只是把task加入到该集合中
3.taskLauncher是一个线程,在其run方法中从tasksToLaunch集合中取出task来执行,调用Tasktracker的startNewTask方法启动task。
4. startNewtask方法中调用localizeJob方法把job相关的配置信息和要运行的jar拷贝到tasktracker本地,然后调用taskInProgress的launchTask方法来启动task。
5.TaskInProgress的launchTask方法先调用localizeTask(task把task相关的配置信息获取到本地。然后创建一个TaskRunner线程来启动task。
6.在TaskRunner的run方法中构建一个java命令的执行的条件,包括引用类,执行目录等,入口类是Child。然后调用JvmManager 的launchJvm方法来调用。
7.JvmManager 进而调用 JvmManagerForType的reapJvm,和spawnNewJvm 方法,发起调用。
8. 在JvmManagerForType的spawnNewJvm 方法中创建了一个JvmRunner线程类执行调用。
9. JvmRunner线程的run反复调用runChild方法来执行 一个命令行的调用。
[......]

阅读全文

Tags: , , ,

Comments { 0 }

【hadoop代码笔记】hadoop作业提交之Job初始化

一、概要描述

上一篇博文中主要描述了JobTracker和其几个服务(或功能)模块的接收到提交的job后的一些处理。其中很重要的一部分就作业的初始化。因为代码片段图的表达问题,本应该在上篇描述的内容,分开在本篇描述。
二、 流程描述

1. 代码也接上文的最后一个方法EagerTaskInitializationListener的 jobAdded方法把JobInProgress类型的job放到List<JobInProgress>类型的 jobInitQueue中,有个单独的线程会对新加入的每个job进行初始化,其初始化调用的方法就是JobInProgress的方法 initTasks。

2. 在JobInProgress的方法initTasks方法中,会根据传入的作业分片创建对应数量的TaskInProgress类型的maptask,同时会创建TaskInProgress类型的指定数量的reducetask。

3. TaskInProgress的初始化是由其构造函数和构造函数中调用的init方法完成的。
[......]

阅读全文

Tags: , , ,

Comments { 1 }

【hadoop代码笔记】hadoop作业提交之TaskTracker获取Task

一、概要描述

上上一篇博文上一篇博文中 分别描述了jobTracker和其服务(功能)模块初始化完成后,接收JobClient提交的作业,并进行初始化。本文着重描 述,JobTracker如何选择作业的Task分发到TaskTracker。本文只是描述一个TaskTracker如何从JobTracker获取 Task任务。Task任务在TaskTracker如何执行将在后面博文中描述。

二、 流程描述

1. TaskTracker在run中调用offerService()方法一直死循环的去连接Jobtracker,先Jobtracker发送心跳,发送自身状态,并从Jobtracker获取任务指令来执行。
2. 在JobTracker的heartbeat方法中,对于来自每一个TaskTracker的心跳请求,根据一定的作业调度策略调用assignTasks方法选择一定Task
3.Scheduler调用对应的LoadManager的canAssignMap方法和canAssignReduce方法以决定是否可以给 tasktracker分配任务。默认的是CapBasedLoad,全局平均分配。即根据全局的任务槽数,全局的map任务数的比值得到一个load系 数,该系数乘以待分配任务的tasktracker的最大map任务数,即是该tasktracker能分配得到的任务数。如果太tracker当前运行 的任务数小于可运行的任务数,则任务可以分配新作业给他。(图中缺失了LoadManager的表达,也画不下了,就不加了。在代码详细分析中有)
3. Scheduler的调用TaskSelector的obtainNewMapTask或者obtainNewReduceTask选择Task。
4. 在DefaultTaskSelector中选择Task的方法其实只是封装了JobInProgress的对应方法。
5. JobTracker根据得到的Task构造TaskTrackerAction设置到到HeartbeatResponse返回给TaskTracker。
6. TaskTracker中将来自JobTracker的任务加入到TaskQueue中等待执行。
[......]

阅读全文

Tags: , , ,

Comments { 1 }

【hadoop代码笔记hadoop作业提交之JobTracker等相关功能模块初始化

一、概要描述

本文重点描述在JobTracker一端接收作业、调度作业等几个模块的初始化工作。想过模块的介绍会在其他文章中比较详细的描述。受理作业提交在下一篇文章中会进行描述。

为了表达的尽可能清晰一点只是摘录出影响逻辑流转的主要代码。重点强调直接的协作调用,每个内部完成的逻辑(一直可以更细的说明、有些细节可能自己也理解并不深刻:-()在后续会描述。

主要包括JobTracker、TaskScheduler(此处以FairScheduler为例)、JobInProgressListener(以用的较多的EagerTaskInitializationListener为例)、TaskSelector(以最简单的DefaultTaskSelector为例)等。

二、 流程描述

1 JobTracker 的main函数中调用其startTracker方法。
2. 在mai函数中调用offerService,启动各个子服务项(大部分形态都是线程,有些是其他的初始化,如taskScheduler)
3 在startTracker中调用其构造函数,在构造函数中对其中重要的属性根据配置进行初始化。()个人感觉再构造中设置scheduler,在statTracker调用构造的下一句有给Scheduler传JobTracker的引用,有点不自然)
4. 在offerService()中启动taskSchedulerexpireTrackersThread retireJobsThread expireLaunchingTaskThread completedJobsStoreThread interTrackerServer等几个线程来共同完成服务。同时调用TaskScheduler的start方法进行初始化。
5. 在FairScheduler调度器的start方法中调用EagerTaskInitializationListenerr的start方法来初始化EagerTaskInitializationListener
6. . 在FairScheduler调度器的start方法中调用DefaultTaskSelector的start方法来初始化DefaultTaskSelector,因为该类实现的TaskSelector太简单,start方法里也没有做任何事情。
[......]

阅读全文

Tags: , , ,

Comments { 2 }

【hadoop代码笔记】hadoop作业提交之JobTracker接收作业提交

一、概要描述

上一篇博文中主要描述了JobTracker接收作业的几个服务(或功能)模块的初始化过程。本节将介绍这些服务(或功能)是如何接收到提交的job。本来作业的初始化也可以在本节内描述,但是涉及到JobInProgress的初始化过程放在一张图上太拥挤,就分开到下一篇文章中描述。

二、 流程描述

  1. ?JobClient通过RPC的方式向JobTracker提交作业
  2. 调用JobTracker的submitJob方法。该方法是JobTracker向外提供的供调用的提交作业的接口。
  3. submit方法中调用JobTracker的addJob方法。
  4. 在addJob方法中会把作业加入到集合中供调度,并会触发注册的JobInProgressListener的jobAdded事件。由上篇博文的jobtracker相关服务和功能的初始化的FairScheduler的start方法中看到,这里注册的是两个JobInProgressListener。分别是FairScheduler的内部类JobListener和EagerTaskInitializationListener
  5. ?FairScheduler的内部类JobListener响应jobAdded事件事件。只是为每个加入的Job创建一个用于FairScheduler调度用的JobInfo对象,并将其和job的对应的存储在Map<JobInProgress, JobInfo> infos集合中。
  6. ?EagerTaskInitializationListener响应jobAdded事件事件。jobAdded 只是简单的把job加入到一个List<JobInProgress>类型的 jobInitQueue中。并不直接对其进行初始化,对其中的job的处理由另外线程JobInitManager来做。该线程,一直检查 jobInitQueue是否有作业,有则拿出来从线程池中取一个线程InitJob处理。关于作业的初始化过程专门在下一篇文章中介绍。

[......]

阅读全文

Tags: , , ,

Comments { 0 }