
Inbox 结构:
private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
//消息体
@GuardedBy("this")
protected val messages = new java.util.linkedList[InboxMessage]()
@GuardedBy("this")
private var stopped = false
@GuardedBy("this")
private var enableConcurrent = false
@GuardedBy("this")
private var numActiveThreads = 0
// onStart should be the first message to process
inbox.synchronized {
//onstart 是Executor的收件箱中的消息 ,Onstart消息
messages.add(OnStart)
}
......
}
补充说明:在Rpc通信中,每个消息都有生命周期:如下源码注释说讲:
constructor -> onStart -> receive* -> onStop
private[spark] trait RpcEndpoint {......}
Exexutor 的OnStart消息就是 CoarseGrainedExecutorBackend对象的Onstart
override def onStart(): Unit = {
logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
//获取driver
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
//并向driver发送注册Executor的请求
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, _resources, resourceProfile.id))
}(ThreadUtils.sameThread).onComplete {
case Success(_) =>
self.send(RegisteredExecutor)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
Executor发送消息之后,Driver会接受消息,然而,Driver是一个线程,那么Driver通过环境变量SparkContext接受请求;如下图所示:
Driver处理注册Executor 并响应:
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
attributes, resources, resourceProfileId) =>
if (executorDataMap.contains(executorId)) {
context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
} else if (scheduler.nodeBlacklist.contains(hostname) ||
isBlacklisted(executorId, hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
logInfo(s"Rejecting $executorId as it has been blacklisted.")
context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
//配置Executor地址
addressToExecutorId(executorAddress) = executorId
//设置总core的消息
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val resourcesInfo = resources.map{ case (k, v) =>
(v.name,
new ExecutorResourceInfo(v.name, v.addresses,
// tell the executor it can schedule resources up to numParts times,
// as configured by the user, or set to 1 as that is the default (1 task/resource)
taskResourceNumParts.getOrElse(v.name, 1)))
}
val data = new ExecutorData(executorRef, executorAddress, hostname,
0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
resourcesInfo, resourceProfileId)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
// Note: some tests expect the reply to come after we put the executor in the map
//响应注册成功
context.reply(true)
}
当Driver发送给Executor注册成功的消息后,Executor接受消息,并创建Executor对象:
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFileOpt: Option[String],
resourceProfile: ResourceProfile)
extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
......
override def receive: PartialFunction[Any, Unit] = {
//接受注册成功的消息,
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
//构建Executor对象
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
//并向Driver发送ExecutorId
driver.get.send(LaunchedExecutor(executorId))
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
executor.launchTask(this, taskDesc)
}
......
}
}
到此步:Spark计算环境创建成功:NodeManager ,ResourceManager ,ApplicationManager, Driver,Executor均已创建成功;
最后附上提交流程图:
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)