首页 > 技术文章 > 【Flink提交流程源码】七、jobMaster启动

fi0108 2021-06-21 16:42 原文

一、jobMaster启动内容

private void startJobMasterServices() throws Exception {
//心跳 taskManage与resourceManage的交互 startHeartbeatServices(); // start the slot pool make sure the slot pool now accepts messages for this leader
//jobMaster内部有个槽池slotPool,而resourceMange内部有个slotManage slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor()); //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start // try to reconnect to previously known leader
//与resourceManage建立连接 reconnectToResourceManager(new FlinkException("Starting JobMaster component.")); // job is ready to go, try to establish connection with resource manager // - activate leader retrieval for the resource manager // - on notification of the leader, the connection will be established and // the slot pool will start requesting slots
//与resourceMange建立连接之后,slotPool开始请求资源 resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); }

  slotPool请求resourceManage资源

进入StandaloneLeaderRetrievalService

public void start(LeaderRetrievalListener listener) {
		checkNotNull(listener, "Listener must not be null.");

		synchronized (startStopLock) {
			checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
			started = true;

			// directly notify the listener, because we already know the leading JobManager's address

//核心,通知监听器找谁呢? listener.notifyLeaderAddress(leaderAddress, leaderId); } }

  进入内部类

ResourceManagerLeaderListener

private class ResourceManagerLeaderListener implements LeaderRetrievalListener {

		@Override
		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
			runAsync(
//主要方法 () -> notifyOfNewResourceManagerLeader( leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID))); } @Override public void handleError(final Exception exception) { handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exception)); } }

  最终跳到

private void connectToResourceManager() {
		assert(resourceManagerAddress != null);
		assert(resourceManagerConnection == null);
		assert(establishedResourceManagerConnection == null);

		log.info("Connecting to ResourceManager {}", resourceManagerAddress);

		resourceManagerConnection = new ResourceManagerConnection(
			log,
			jobGraph.getJobID(),
			resourceId,
			getAddress(),
			getFencingToken(),
			resourceManagerAddress.getAddress(),
			resourceManagerAddress.getResourceManagerId(),
			scheduledExecutorService);
//获取连接之后,开始
		resourceManagerConnection.start();
	}

  进入

public void start() {
		checkState(!closed, "The RPC connection is already closed");
		checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
//1.创建注册对象
		final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();

		if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
//2.开始注册 newRegistration.startRegistration(); } else { // concurrent start operation newRegistration.cancel(); } }

  

1.创建注册对象createNewRegistration();进入
private RetryingRegistration<F, G, S> createNewRegistration() {
//生成注册 RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration()); CompletableFuture<Tuple2<G, S>> future = newRegistration.getFuture(); future.whenCompleteAsync( (Tuple2<G, S> result, Throwable failure) -> { if (failure != null) { if (failure instanceof CancellationException) { // we ignore cancellation exceptions because they originate from cancelling // the RetryingRegistration log.debug("Retrying registration towards {} was cancelled.", targetAddress); } else { // this future should only ever fail if there is a bug, not if the registration is declined onRegistrationFailure(failure); } } else { targetGateway = result.f0;
//注册成功 onRegistrationSuccess(result.f1); } }, executor); return newRegistration; }

  

generateRegistration  这个实现JobMaster类

 

 

protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
			return new RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>(
				log,
				getRpcService(),
				"ResourceManager",
				ResourceManagerGateway.class,
				getTargetAddress(),
				getTargetLeaderId(),
				jobMasterConfiguration.getRetryingRegistrationConfiguration()) {

				@Override
				protected CompletableFuture<RegistrationResponse> invokeRegistration(
						ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) {
					Time timeout = Time.milliseconds(timeoutMillis);
//注册成功!!
					return gateway.registerJobManager(
						jobMasterId,
						jobManagerResourceID,
						jobManagerRpcAddress,
						jobID,
						timeout);
				}
			};
		}

  

//2.开始注册
newRegistration.startRegistration();


//3.注册成功之后的逻辑


@Override
		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
			runAsync(() -> {
				// filter out outdated connections
				//noinspection ObjectEquality
				if (this == resourceManagerConnection) {
					establishResourceManagerConnection(success);
				}
			});
		}

  

establishResourceManagerConnection(success);
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
		final ResourceManagerId resourceManagerId = success.getResourceManagerId();

		// verify the response with current connection
		if (resourceManagerConnection != null
				&& Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {

			log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId);
//通讯模块
			final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();

			final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();

			establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
				resourceManagerGateway,
				resourceManagerResourceId);
//slotPool建立连接ResourceManage,请求资源
			slotPool.connectToResourceManager(resourceManagerGateway);

			resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
				@Override
				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
					resourceManagerGateway.heartbeatFromJobManager(resourceID);
				}

				@Override
				public void requestHeartbeat(ResourceID resourceID, Void payload) {
					// request heartbeat will never be called on the job manager side
				}
			});

 进入

SlotPoolImpl类  connectToResourceManager

public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);

// work on all slots waiting for this connection
for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
//请求资源
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}

// all sent off
waitingForResourceManager.clear();
}

  

进入requestSlotFromResourceManager
private void requestSlotFromResourceManager(
			final ResourceManagerGateway resourceManagerGateway,
			final PendingRequest pendingRequest) {
//远程调用,真正请求资源
		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
			jobMasterId,
			new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
			rpcTimeout);

		FutureUtils.whenCompleteAsyncIfNotDone(
			rmResponse,
			componentMainThreadExecutor,
			(Acknowledge ignored, Throwable failure) -> {
				// on failure, fail the request future
				if (failure != null) {
					slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
				}
			});
	}

  

进入ResourceManager类requestSlot
public CompletableFuture<Acknowledge> requestSlot(
			JobMasterId jobMasterId,
			SlotRequest slotRequest,
			final Time timeout) {

		JobID jobId = slotRequest.getJobId();
		JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);

		if (null != jobManagerRegistration) {
			if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
				log.info("Request slot with profile {} for job {} with allocation id {}.",
					slotRequest.getResourceProfile(),
					slotRequest.getJobId(),
					slotRequest.getAllocationId());

				try {
//resourceManage内部的slotManage向yarn的resourceManage申请资源 slotManager.registerSlotRequest(slotRequest); } catch (ResourceManagerException e) { return FutureUtils.completedExceptionally(e); }

  进入

SlotManagerImpl
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
		checkInit();

		if (checkDuplicateRequest(slotRequest.getAllocationId())) {
			LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());

			return false;
		} else {
			PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);

			pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);

			try {
//申请 internalRequestSlot(pendingSlotRequest);

  

private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
		final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();

		OptionalConsumer.of(findMatchingSlot(resourceProfile))
			.ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
//空闲存在的槽 .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest)); }

  进入

fulfillPendingSlotRequestWithPendingTaskManagerSlot

private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
		ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
		Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);

		if (!pendingTaskManagerSlotOptional.isPresent()) {
//找资源 pendingTaskManagerSlotOptional = allocateResource(resourceProfile); } OptionalConsumer.of(pendingTaskManagerSlotOptional) .ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot)) .ifNotPresent(() -> { // request can not be fulfilled by any free slot or pending slot that can be allocated, // check whether it can be fulfilled by allocated slots if (failUnfulfillableRequest && !isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) { throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile()); } }); }

  

allocateResource

private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile requestedSlotResourceProfile) {
		final int numRegisteredSlots =  getNumberRegisteredSlots();
//挂起的资源 final int numPendingSlots = getNumberPendingTaskManagerSlots(); if (isMaxSlotNumExceededAfterAdding(numSlotsPerWorker)) { LOG.warn("Could not allocate {} more slots. The number of registered and pending slots is {}, while the maximum is {}.", numSlotsPerWorker, numPendingSlots + numRegisteredSlots, maxSlotNum); return Optional.empty(); } if (!defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) { // requested resource profile is unfulfillable return Optional.empty(); } //分配资源 if (!resourceActions.allocateResource(defaultWorkerResourceSpec)) { // resource cannot be allocated return Optional.empty(); } PendingTaskManagerSlot pendingTaskManagerSlot = null; for (int i = 0; i < numSlotsPerWorker; ++i) { pendingTaskManagerSlot = new PendingTaskManagerSlot(defaultSlotResourceProfile); pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot); } return Optional.of(Preconditions.checkNotNull(pendingTaskManagerSlot, "At least one pending slot should be created.")); }
allocateResource

@Override
		public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
			validateRunsInMainThread();
			return startNewWorker(workerResourceSpec);
		}

  

startNewWorker 具体实现类ActiveResourceManager
@Override
	public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
		requestNewWorker(workerResourceSpec);
		return true;
	}

 进入requestNewWorker 请求工作节点 

jobMaster工作结束

yarn要启动容器的入口,下一节

 

 



推荐阅读