首页 > 技术文章 > Flink Heartbeat of TaskManager和Heartbeat of ResourceManager timed out问题

createweb 2020-04-22 11:30 原文

最近上了个Flink任务,运行一段时间后就自动停止了,很是郁闷,查看最后一个chekpoint时间点,翻看时间日志

2019-12-13 07:25:24.566 flink [flink-akka.actor.default-dispatcher-41] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Job PayOrder (88c9cc0c85875332cc5e4ed6418cd667) switched from state RUNNING to FAILING.java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1566481621886_4397244_01_000004 timed out.
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1656)
    at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2019-12-13 07:25:24.519 flink [flink-akka.actor.default-dispatcher-20] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 9b7931812dbed76060b48a696d72a869: The heartbeat of ResourceManager with id 9b7931812dbed76060b48a696d72a869 timed out..

根据Heartbeat of TaskManager with id和The heartbeat of ResourceManager with id在源码中找出这样的代码

    private class TaskManagerHeartbeatListener implements HeartbeatListener<AccumulatorReport, Void> {

        private final JobMasterGateway jobMasterGateway;

        private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            jobMasterGateway.disconnectTaskManager(
                resourceID,
                new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
        }

        @Override
        public void reportPayload(ResourceID resourceID, AccumulatorReport payload) {
            for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) {
                schedulerNG.updateAccumulators(snapshot);
            }
        }

        @Override
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceId) {
            runAsync(() -> {
                log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);

                if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
                    reconnectToResourceManager(
                        new JobMasterException(
                            String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
            // nothing to do since the payload is of type Void
        }

        @Override
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

然后在这实例化

this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(selfGateway),rpcService.getScheduledExecutor(),log);

顺着去heartbeatServices瞅瞅了

/**
 * HeartbeatServices gives access to all services needed for heartbeating. This includes the
 * creation of heartbeat receivers and heartbeat senders.
 */
public class HeartbeatServices {

    /** Heartbeat interval for the created services. */
    protected final long heartbeatInterval;

    /** Heartbeat timeout for the created services. */
    protected final long heartbeatTimeout;

    public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
        Preconditions.checkArgument(0L < heartbeatInterval, "The heartbeat interval must be larger than 0.");
        Preconditions.checkArgument(heartbeatInterval <= heartbeatTimeout, "The heartbeat timeout should be larger or equal than the heartbeat interval.");

        this.heartbeatInterval = heartbeatInterval;
        this.heartbeatTimeout = heartbeatTimeout;
    }

    /**
     * Creates a heartbeat manager which does not actively send heartbeats.
     *
     * @param resourceId Resource Id which identifies the owner of the heartbeat manager
     * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
     *                          targets
     * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
     * @param log Logger to be used for the logging
     * @param <I> Type of the incoming payload
     * @param <O> Type of the outgoing payload
     * @return A new HeartbeatManager instance
     */
    public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
        ResourceID resourceId,
        HeartbeatListener<I, O> heartbeatListener,
        ScheduledExecutor scheduledExecutor,
        Logger log) {

        return new HeartbeatManagerImpl<>(
            heartbeatTimeout,
            resourceId,
            heartbeatListener,
            scheduledExecutor,
            scheduledExecutor,
            log);
    }

    /**
     * Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
     *
     * @param resourceId Resource Id which identifies the owner of the heartbeat manager
     * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
     *                          targets
     * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
     * @param log Logger to be used for the logging
     * @param <I> Type of the incoming payload
     * @param <O> Type of the outgoing payload
     * @return A new HeartbeatManager instance which actively sends heartbeats
     */
    public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
        ResourceID resourceId,
        HeartbeatListener<I, O> heartbeatListener,
        ScheduledExecutor scheduledExecutor,
        Logger log) {

        return new HeartbeatManagerSenderImpl<>(
            heartbeatInterval,
            heartbeatTimeout,
            resourceId,
            heartbeatListener,
            scheduledExecutor,
            scheduledExecutor,
            log);
    }

    /**
     * Creates an HeartbeatServices instance from a {@link Configuration}.
     *
     * @param configuration Configuration to be used for the HeartbeatServices creation
     * @return An HeartbeatServices instance created from the given configuration
     */
    public static HeartbeatServices fromConfiguration(Configuration configuration) {
        long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);

        long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

        return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
    }
}

没错超时时间就在HeartbeatManagerOptions.HEARTBEAT_TIMEOUT

    /** Timeout for requesting and receiving heartbeat for both sender and receiver sides. */
    public static final ConfigOption<Long> HEARTBEAT_TIMEOUT =
            key("heartbeat.timeout")
            .defaultValue(50000L)
            .withDescription("Timeout for requesting and receiving heartbeat for both sender and receiver sides.");

引起心跳超时有可能是yarn压力比较大引起的,先暂时在conf/flink-conf.yaml将这个值调大一点,再观察。

#Timeout for requesting and receiving heartbeat for both sender and receiver sides.
heartbeat.timeout: 180000

 

 

 

推荐阅读