首页 > 解决方案 > 春季启动应用程序优雅关闭期间的KafkaProducer InterruptedException

问题描述

对于一个项目,我们正在向 kafka 发送一些事件。我们使用 spring-kafka 2.6.2。

由于使用了 spring-vault,我们必须在凭证租用结束之前重新启动/终止应用程序(应用程序由 kubernetes 自动重新启动)。我们的问题是,当使用applicationContext.close()继续我们的优雅关闭时, KafkaProducer在其 close() 方法中加入 ioThread 时会收到一个 InterruptedException Interrupted 。这意味着在我们的例子中,一些挂起的事件在关闭之前不会发送到 kafka,因为它由于销毁期间的错误而被迫关闭。

在堆栈跟踪下

2020-12-18 13:57:29.007  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.s.b.w.e.tomcat.GracefulShutdown        : Commencing graceful shutdown. Waiting for active requests to complete
2020-12-18 13:57:29.009  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2020-12-18 13:57:29.013  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Destroying Spring FrameworkServlet 'dispatcherServlet'
2020-12-18 13:57:29.014  INFO [titan-producer,,,] 1 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown        : Graceful shutdown complete
2020-12-18 13:57:29.020  WARN [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.a.c.loader.WebappClassLoaderBase       : The web application [ROOT] appears to have started a thread named [kafka-producer-network-thread | titan-producer-1] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
 java.base@11.0.9.1/sun.nio.ch.EPoll.wait(Native Method)
 java.base@11.0.9.1/sun.nio.ch.EPollSelectorImpl.doSelect(Unknown Source)
 java.base@11.0.9.1/sun.nio.ch.SelectorImpl.lockAndDoSelect(Unknown Source)
 java.base@11.0.9.1/sun.nio.ch.SelectorImpl.select(Unknown Source)
 org.apache.kafka.common.network.Selector.select(Selector.java:873)
 org.apache.kafka.common.network.Selector.poll(Selector.java:469)
 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
 org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
 java.base@11.0.9.1/java.lang.Thread.run(Unknown Source)
2020-12-18 13:57:29.021  WARN [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.a.c.loader.WebappClassLoaderBase       : The web application [ROOT] appears to have started a thread named [micrometer-kafka-metrics] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
 java.base@11.0.9.1/jdk.internal.misc.Unsafe.park(Native Method)
 java.base@11.0.9.1/java.util.concurrent.locks.LockSupport.parkNanos(Unknown Source)
 java.base@11.0.9.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown Source)
 java.base@11.0.9.1/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown Source)
 java.base@11.0.9.1/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown Source)
 java.base@11.0.9.1/java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source)
 java.base@11.0.9.1/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
 java.base@11.0.9.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
 java.base@11.0.9.1/java.lang.Thread.run(Unknown Source)
2020-12-18 13:57:29.046  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2020-12-18 13:57:29.048  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2020-12-18 13:57:29.051  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=titan-producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2020-12-18 13:57:29.055 ERROR [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=titan-producer-1] Interrupted while joining ioThreadjava.lang.InterruptedException: null
        at java.base/java.lang.Object.wait(Native Method)
        at java.base/java.lang.Thread.join(Unknown Source)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1205)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1182)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.closeDelegate(DefaultKafkaProducerFactory.java:901)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory.destroy(DefaultKafkaProducerFactory.java:428)
        at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:258)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:587)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:559)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:1092)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:520)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:1085)
        at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1061)
        at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1030)
        at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.doClose(ServletWebServerApplicationContext.java:170)
        at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:979)
        at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)2020-12-18 13:57:29.055  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=titan-producer-1] Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms.
2020-12-18 13:57:29.056  WARN [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.s.b.f.support.DisposableBeanAdapter    : Invocation of destroy method failed on bean with name 'kafkaProducerFactory': org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
2020-12-18 13:57:29.064  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService
2020-12-18 13:57:29.065  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] c.l.t.p.zookeeper.ZookeeperManagerImpl   : Closing zookeeperConnection
2020-12-18 13:57:29.197  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] org.apache.zookeeper.ZooKeeper           : Session: 0x30022348ba6000b closed
2020-12-18 13:57:29.197  INFO [titan-producer,,,] 1 --- [d-1-EventThread] org.apache.zookeeper.ClientCnxn          : EventThread shut down for session: 0x30022348ba6000b
2020-12-18 13:57:29.206  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] com.zaxxer.hikari.HikariDataSource       : loadtest_fallback_titan_pendingEvents - Shutdown initiated...
2020-12-18 13:57:29.221  INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [   scheduling-1] com.zaxxer.hikari.HikariDataSource       : loadtest_fallback_titan_pendingEvents - Shutdown completed.

这是我的配置类

@Flogger
@EnableKafka
@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(
        name = "titan.producer.kafka.enabled",
        havingValue = "true",
        matchIfMissing = true)
public class KafkaConfiguration {

    @Bean
    DefaultKafkaProducerFactoryCustomizer kafkaProducerFactoryCustomizer(ObjectMapper mapper) {
        return producerFactory -> producerFactory.setValueSerializer(new JsonSerializer<>(mapper));
    }

    @Bean
    public NewTopic createTopic(TitanProperties titanProperties, KafkaProperties kafkaProperties) {
        TitanProperties.Kafka kafka = titanProperties.getKafka();
        String defaultTopic = kafkaProperties.getTemplate().getDefaultTopic();
        int numPartitions = kafka.getNumPartitions();
        short replicationFactor = kafka.getReplicationFactor();

        log.atInfo()
                .log("Creating Kafka Topic %s with %s partitions and %s replicationFactor", defaultTopic, numPartitions, replicationFactor);

        return TopicBuilder.name(defaultTopic)
                .partitions(numPartitions)
                .replicas(replicationFactor)
                .config(MESSAGE_TIMESTAMP_TYPE_CONFIG, LOG_APPEND_TIME.name)
                .build();
    }
}

和我的 application.yaml

spring:
  application:
    name: titan-producer

  kafka:
    client-id: ${spring.application.name}
    producer:
      key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

      properties:
        max.block.ms: 2000
        request.timeout.ms: 2000
        delivery.timeout.ms: 2000 #must be greater or equal to request.timeout.ms + linger.ms

    template:
      default-topic: titan-dev

我们的 Vault 配置使用 scheduleTask 执行 applicationContext.close()。我们这样做是随机的,因为我们有多个并行运行的应用程序副本,并避免同时杀死所有副本。

@Flogger
@Configuration
@ConditionalOnBean(SecretLeaseContainer.class)
@ConditionalOnProperty(
        name = "titan.producer.scheduling.enabled",
        havingValue = "true",
        matchIfMissing = true)
public class VaultConfiguration {

    @Bean
    public Lifecycle scheduledAppRestart(Clock clock, TitanProperties properties, TaskScheduler scheduler, ConfigurableApplicationContext applicationContext) {
        Instant now = clock.instant();
        Duration maxTTL = properties.getVaultConfig().getCredsMaxLease();
        Instant start = now.plusSeconds(maxTTL.dividedBy(2).toSeconds());
        Instant end = now.plusSeconds(maxTTL.minus(properties.getVaultConfig().getCredsMaxLeaseExpirationThreshold()).toSeconds());
        Instant randomInstant = randBetween(start, end);
        return new ScheduledLifecycle(scheduler, applicationContext::close, "application restart before lease expiration", randomInstant);
    }

    private Instant randBetween(Instant startInclusive, Instant endExclusive) {
        long startSeconds = startInclusive.getEpochSecond();
        long endSeconds = endExclusive.getEpochSecond();
        long random = RandomUtils.nextLong(startSeconds, endSeconds);

        return Instant.ofEpochSecond(random);
    }

}

我们用来运行计划任务的 ScheduledLifecycle 类

import lombok.extern.flogger.Flogger;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.TaskScheduler;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ScheduledFuture;

@Flogger
public class ScheduledLifecycle implements SmartLifecycle {
    private ScheduledFuture<?> future = null;
    private Duration delay = null;
    private final TaskScheduler scheduler;
    private final Runnable command;
    private final String commandDesc;
    private final Instant startTime;

    public ScheduledLifecycle(TaskScheduler scheduler, Runnable command, String commandDesc, Instant startTime) {
        this.scheduler = scheduler;
        this.command = command;
        this.commandDesc = commandDesc;
        this.startTime = startTime;
    }

    public ScheduledLifecycle(TaskScheduler scheduler, Runnable command, String commandDesc, Instant startTime, Duration delay) {
        this(scheduler, command, commandDesc, startTime);
        this.delay = delay;
    }

    @Override
    public void start() {
        if (delay != null) {
            log.atInfo().log("Scheduling %s: starting at %s, running every %s", commandDesc, startTime, delay);
            future = scheduler.scheduleWithFixedDelay(command, startTime, delay);
        } else {
            log.atInfo().log("Scheduling %s: execution at %s", commandDesc, startTime);
            future = scheduler.schedule(command, startTime);
        }
    }

    @Override
    public void stop() {
        if (future != null) {
            log.atInfo().log("Stop %s", commandDesc);
            future.cancel(true);
        }

    }

    @Override
    public boolean isRunning() {
        boolean running = future != null && (!future.isDone() && !future.isCancelled());
        log.atFine().log("is %s running? %s", running);
        return running;
    }
}

spring-kafka 有错误吗?任何的想法?

谢谢

标签: spring-bootspring-kafka

解决方案


future.cancel(true);

这会中断生产者线程,并且可能是问题的根本原因。

您应该使用future.cancel(false);允许任务以有序的方式终止,而不会中断。

    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when {@code cancel} is called,
     * this task should never run.  If the task has already started,
     * then the {@code mayInterruptIfRunning} parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     *
     * <p>After this method returns, subsequent calls to {@link #isDone} will
     * always return {@code true}.  Subsequent calls to {@link #isCancelled}
     * will always return {@code true} if this method returned {@code true}.
     *
     * @param mayInterruptIfRunning {@code true} if the thread executing this
     * task should be interrupted; otherwise, in-progress tasks are allowed
     * to complete
     * @return {@code false} if the task could not be cancelled,
     * typically because it has already completed normally;
     * {@code true} otherwise
     */
    boolean cancel(boolean mayInterruptIfRunning);

编辑

此外,ThreadPoolTaskScheduler.waitForTasksToCompleteOnShutdown默认情况下为false。

    /**
     * Set whether to wait for scheduled tasks to complete on shutdown,
     * not interrupting running tasks and executing all tasks in the queue.
     * <p>Default is "false", shutting down immediately through interrupting
     * ongoing tasks and clearing the queue. Switch this flag to "true" if you
     * prefer fully completed tasks at the expense of a longer shutdown phase.
     * <p>Note that Spring's container shutdown continues while ongoing tasks
     * are being completed. If you want this executor to block and wait for the
     * termination of tasks before the rest of the container continues to shut
     * down - e.g. in order to keep up other resources that your tasks may need -,
     * set the {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"}
     * property instead of or in addition to this property.
     * @see java.util.concurrent.ExecutorService#shutdown()
     * @see java.util.concurrent.ExecutorService#shutdownNow()
     */
    public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
        this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
    }

您可能还必须设置awaitTerminationSeconds.


推荐阅读