
TezChild get a task from containerReporter
ListenableFutureContainerReporter.callInternalgetTaskFuture = executor.submit(containerReporter); containerTask = getTaskFuture.get();
umbilical is umbilical to App Master.
@Override
protected ContainerTask callInternal() throws Exception {
containerTask = umbilical.getTask(containerContext);
TezChild.run
After retrieve a ContainerTask object, create a TezTaskRunner2 object and call run method of it.
TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI,
localDirs, containerTask.getTaskSpec(), appAttemptNumber,
serviceConsumermetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters,
hadoopShim, sharedExecutor);
TezTaskRunner2. constructor
create task.
this.task = new LogicalIOProcessorRuntimetask(taskSpec, appAttemptNumber, taskConf, localDirs,
umbilicalAndErrorHandler, serviceConsumermetadata, serviceProviderEnvMap, startedInputsMap,
objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim,
sharedExecutor == null ? localExecutor : sharedExecutor);
TezTaskRunner2. run
create a TaskRunner2Callable object and execute it in a thread.
public TaskRunner2Result run() {
taskRunnerCallable = new TaskRunner2Callable(task, ugi,
umbilicalAndErrorHandler);
future = executor.submit(taskRunnerCallable);
future.get();
TaskRunner2Callable
extends CallableWithNdc, and CallableWithNdc.call calls callInteranl, so finally callInternal() in TaskRunner2Callable is called.
public class TaskRunner2Callable extends CallableWithNdc
CallableWithNdc.call
@Override
public final T call() throws Exception {
NDC.inherit(ndcStack);
try {
return callInternal();
} finally {
NDC.clear();
}
}
TaskRunner2Callable.callInternal
The type of task is LogicalIOProcessorRuntimetask
@Override
task.initialize();
task.run();
task.close();
task.cleanup();
}
}
LogicalIOProcessorRuntimetask
constructor
define processorDescriptor in constructor
this.processorDescriptor = taskSpec.getProcessorDescriptor();LogicalIOProcessorRuntimetask.initialize
The type of processor is AbstractLogicalIOProcessor. In case of Hive the class name is org.apache.hadoop.hive.ql.exec.tez.TezProcessor.
this.processor = createProcessor(processorDescriptor.getClassName(), processorContext);run
public void run() throws Exception {
Preconditions.checkState(this.state.get() == State.INITED,
"Can only run while in INITED state. Current: " + this.state);
this.state.set(State.RUNNING);
processor.run(runInputMap, runOutputMap);
}
close
public void close() throws Exception {
processor.close();
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)