一、jobMaster启动内容
private void startJobMasterServices() throws Exception {
//心跳 taskManage与resourceManage的交互 startHeartbeatServices(); // start the slot pool make sure the slot pool now accepts messages for this leader
//jobMaster内部有个槽池slotPool,而resourceMange内部有个slotManage slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor()); //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start // try to reconnect to previously known leader
//与resourceManage建立连接 reconnectToResourceManager(new FlinkException("Starting JobMaster component.")); // job is ready to go, try to establish connection with resource manager // - activate leader retrieval for the resource manager // - on notification of the leader, the connection will be established and // the slot pool will start requesting slots
//与resourceMange建立连接之后,slotPool开始请求资源 resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); }
slotPool请求resourceManage资源
进入StandaloneLeaderRetrievalService
public void start(LeaderRetrievalListener listener) { checkNotNull(listener, "Listener must not be null."); synchronized (startStopLock) { checkState(!started, "StandaloneLeaderRetrievalService can only be started once."); started = true; // directly notify the listener, because we already know the leading JobManager's address
//核心,通知监听器找谁呢? listener.notifyLeaderAddress(leaderAddress, leaderId); } }
进入内部类
ResourceManagerLeaderListener
private class ResourceManagerLeaderListener implements LeaderRetrievalListener { @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { runAsync(
//主要方法 () -> notifyOfNewResourceManagerLeader( leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID))); } @Override public void handleError(final Exception exception) { handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exception)); } }
最终跳到
private void connectToResourceManager() { assert(resourceManagerAddress != null); assert(resourceManagerConnection == null); assert(establishedResourceManagerConnection == null); log.info("Connecting to ResourceManager {}", resourceManagerAddress); resourceManagerConnection = new ResourceManagerConnection( log, jobGraph.getJobID(), resourceId, getAddress(), getFencingToken(), resourceManagerAddress.getAddress(), resourceManagerAddress.getResourceManagerId(), scheduledExecutorService); //获取连接之后,开始 resourceManagerConnection.start(); }
进入
public void start() { checkState(!closed, "The RPC connection is already closed"); checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started"); //1.创建注册对象 final RetryingRegistration<F, G, S> newRegistration = createNewRegistration(); if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
//2.开始注册 newRegistration.startRegistration(); } else { // concurrent start operation newRegistration.cancel(); } }
1.创建注册对象createNewRegistration();进入
private RetryingRegistration<F, G, S> createNewRegistration() {
//生成注册 RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration()); CompletableFuture<Tuple2<G, S>> future = newRegistration.getFuture(); future.whenCompleteAsync( (Tuple2<G, S> result, Throwable failure) -> { if (failure != null) { if (failure instanceof CancellationException) { // we ignore cancellation exceptions because they originate from cancelling // the RetryingRegistration log.debug("Retrying registration towards {} was cancelled.", targetAddress); } else { // this future should only ever fail if there is a bug, not if the registration is declined onRegistrationFailure(failure); } } else { targetGateway = result.f0;
//注册成功 onRegistrationSuccess(result.f1); } }, executor); return newRegistration; }
generateRegistration 这个实现JobMaster类
protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() { return new RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>( log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, getTargetAddress(), getTargetLeaderId(), jobMasterConfiguration.getRetryingRegistrationConfiguration()) { @Override protected CompletableFuture<RegistrationResponse> invokeRegistration( ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) { Time timeout = Time.milliseconds(timeoutMillis); //注册成功!! return gateway.registerJobManager( jobMasterId, jobManagerResourceID, jobManagerRpcAddress, jobID, timeout); } }; }
//2.开始注册
newRegistration.startRegistration();
//3.注册成功之后的逻辑
@Override protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) { runAsync(() -> { // filter out outdated connections //noinspection ObjectEquality if (this == resourceManagerConnection) { establishResourceManagerConnection(success); } }); }
establishResourceManagerConnection(success);
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) { final ResourceManagerId resourceManagerId = success.getResourceManagerId(); // verify the response with current connection if (resourceManagerConnection != null && Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) { log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId); //通讯模块 final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId(); establishedResourceManagerConnection = new EstablishedResourceManagerConnection( resourceManagerGateway, resourceManagerResourceId); //slotPool建立连接ResourceManage,请求资源 slotPool.connectToResourceManager(resourceManagerGateway); resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() { @Override public void receiveHeartbeat(ResourceID resourceID, Void payload) { resourceManagerGateway.heartbeatFromJobManager(resourceID); } @Override public void requestHeartbeat(ResourceID resourceID, Void payload) { // request heartbeat will never be called on the job manager side } });
进入
SlotPoolImpl类 connectToResourceManager
public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
// work on all slots waiting for this connection
for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
//请求资源
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
// all sent off
waitingForResourceManager.clear();
}
进入requestSlotFromResourceManager
private void requestSlotFromResourceManager( final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) { //远程调用,真正请求资源 CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot( jobMasterId, new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout); FutureUtils.whenCompleteAsyncIfNotDone( rmResponse, componentMainThreadExecutor, (Acknowledge ignored, Throwable failure) -> { // on failure, fail the request future if (failure != null) { slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure); } }); }
进入ResourceManager类requestSlot
public CompletableFuture<Acknowledge> requestSlot( JobMasterId jobMasterId, SlotRequest slotRequest, final Time timeout) { JobID jobId = slotRequest.getJobId(); JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId); if (null != jobManagerRegistration) { if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) { log.info("Request slot with profile {} for job {} with allocation id {}.", slotRequest.getResourceProfile(), slotRequest.getJobId(), slotRequest.getAllocationId()); try {
//resourceManage内部的slotManage向yarn的resourceManage申请资源 slotManager.registerSlotRequest(slotRequest); } catch (ResourceManagerException e) { return FutureUtils.completedExceptionally(e); }
进入
SlotManagerImpl
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException { checkInit(); if (checkDuplicateRequest(slotRequest.getAllocationId())) { LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); return false; } else { PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); try {
//申请 internalRequestSlot(pendingSlotRequest);
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); OptionalConsumer.of(findMatchingSlot(resourceProfile)) .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
//空闲存在的槽 .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest)); }
进入
fulfillPendingSlotRequestWithPendingTaskManagerSlot
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile); if (!pendingTaskManagerSlotOptional.isPresent()) {
//找资源 pendingTaskManagerSlotOptional = allocateResource(resourceProfile); } OptionalConsumer.of(pendingTaskManagerSlotOptional) .ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot)) .ifNotPresent(() -> { // request can not be fulfilled by any free slot or pending slot that can be allocated, // check whether it can be fulfilled by allocated slots if (failUnfulfillableRequest && !isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) { throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile()); } }); }
allocateResource
private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile requestedSlotResourceProfile) { final int numRegisteredSlots = getNumberRegisteredSlots();
//挂起的资源 final int numPendingSlots = getNumberPendingTaskManagerSlots(); if (isMaxSlotNumExceededAfterAdding(numSlotsPerWorker)) { LOG.warn("Could not allocate {} more slots. The number of registered and pending slots is {}, while the maximum is {}.", numSlotsPerWorker, numPendingSlots + numRegisteredSlots, maxSlotNum); return Optional.empty(); } if (!defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) { // requested resource profile is unfulfillable return Optional.empty(); } //分配资源 if (!resourceActions.allocateResource(defaultWorkerResourceSpec)) { // resource cannot be allocated return Optional.empty(); } PendingTaskManagerSlot pendingTaskManagerSlot = null; for (int i = 0; i < numSlotsPerWorker; ++i) { pendingTaskManagerSlot = new PendingTaskManagerSlot(defaultSlotResourceProfile); pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot); } return Optional.of(Preconditions.checkNotNull(pendingTaskManagerSlot, "At least one pending slot should be created.")); }
allocateResource
@Override public boolean allocateResource(WorkerResourceSpec workerResourceSpec) { validateRunsInMainThread(); return startNewWorker(workerResourceSpec); }
startNewWorker 具体实现类ActiveResourceManager
@Override public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) { requestNewWorker(workerResourceSpec); return true; }
进入requestNewWorker 请求工作节点
jobMaster工作结束
yarn要启动容器的入口,下一节