spring-boot - 春季启动应用程序优雅关闭期间的KafkaProducer InterruptedException
问题描述
对于一个项目,我们正在向 kafka 发送一些事件。我们使用 spring-kafka 2.6.2。
由于使用了 spring-vault,我们必须在凭证租用结束之前重新启动/终止应用程序(应用程序由 kubernetes 自动重新启动)。我们的问题是,当使用applicationContext.close()继续我们的优雅关闭时, KafkaProducer在其 close() 方法中加入 ioThread 时会收到一个 InterruptedException Interrupted 。这意味着在我们的例子中,一些挂起的事件在关闭之前不会发送到 kafka,因为它由于销毁期间的错误而被迫关闭。
在堆栈跟踪下
2020-12-18 13:57:29.007 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.s.b.w.e.tomcat.GracefulShutdown : Commencing graceful shutdown. Waiting for active requests to complete
2020-12-18 13:57:29.009 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2020-12-18 13:57:29.013 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Destroying Spring FrameworkServlet 'dispatcherServlet'
2020-12-18 13:57:29.014 INFO [titan-producer,,,] 1 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown : Graceful shutdown complete
2020-12-18 13:57:29.020 WARN [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.a.c.loader.WebappClassLoaderBase : The web application [ROOT] appears to have started a thread named [kafka-producer-network-thread | titan-producer-1] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
java.base@11.0.9.1/sun.nio.ch.EPoll.wait(Native Method)
java.base@11.0.9.1/sun.nio.ch.EPollSelectorImpl.doSelect(Unknown Source)
java.base@11.0.9.1/sun.nio.ch.SelectorImpl.lockAndDoSelect(Unknown Source)
java.base@11.0.9.1/sun.nio.ch.SelectorImpl.select(Unknown Source)
org.apache.kafka.common.network.Selector.select(Selector.java:873)
org.apache.kafka.common.network.Selector.poll(Selector.java:469)
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
java.base@11.0.9.1/java.lang.Thread.run(Unknown Source)
2020-12-18 13:57:29.021 WARN [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.a.c.loader.WebappClassLoaderBase : The web application [ROOT] appears to have started a thread named [micrometer-kafka-metrics] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
java.base@11.0.9.1/jdk.internal.misc.Unsafe.park(Native Method)
java.base@11.0.9.1/java.util.concurrent.locks.LockSupport.parkNanos(Unknown Source)
java.base@11.0.9.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown Source)
java.base@11.0.9.1/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown Source)
java.base@11.0.9.1/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown Source)
java.base@11.0.9.1/java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source)
java.base@11.0.9.1/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
java.base@11.0.9.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.base@11.0.9.1/java.lang.Thread.run(Unknown Source)
2020-12-18 13:57:29.046 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
2020-12-18 13:57:29.048 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
2020-12-18 13:57:29.051 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=titan-producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2020-12-18 13:57:29.055 ERROR [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=titan-producer-1] Interrupted while joining ioThreadjava.lang.InterruptedException: null
at java.base/java.lang.Object.wait(Native Method)
at java.base/java.lang.Thread.join(Unknown Source)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1205)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1182)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.closeDelegate(DefaultKafkaProducerFactory.java:901)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.destroy(DefaultKafkaProducerFactory.java:428)
at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:258)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:587)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:559)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:1092)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:520)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:1085)
at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1061)
at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1030)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.doClose(ServletWebServerApplicationContext.java:170)
at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:979)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)2020-12-18 13:57:29.055 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=titan-producer-1] Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms.
2020-12-18 13:57:29.056 WARN [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.s.b.f.support.DisposableBeanAdapter : Invocation of destroy method failed on bean with name 'kafkaProducerFactory': org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
2020-12-18 13:57:29.064 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService
2020-12-18 13:57:29.065 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] c.l.t.p.zookeeper.ZookeeperManagerImpl : Closing zookeeperConnection
2020-12-18 13:57:29.197 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] org.apache.zookeeper.ZooKeeper : Session: 0x30022348ba6000b closed
2020-12-18 13:57:29.197 INFO [titan-producer,,,] 1 --- [d-1-EventThread] org.apache.zookeeper.ClientCnxn : EventThread shut down for session: 0x30022348ba6000b
2020-12-18 13:57:29.206 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] com.zaxxer.hikari.HikariDataSource : loadtest_fallback_titan_pendingEvents - Shutdown initiated...
2020-12-18 13:57:29.221 INFO [titan-producer,222efdd2a07966ce,222efdd2a07966ce,true] 1 --- [ scheduling-1] com.zaxxer.hikari.HikariDataSource : loadtest_fallback_titan_pendingEvents - Shutdown completed.
这是我的配置类
@Flogger
@EnableKafka
@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(
name = "titan.producer.kafka.enabled",
havingValue = "true",
matchIfMissing = true)
public class KafkaConfiguration {
@Bean
DefaultKafkaProducerFactoryCustomizer kafkaProducerFactoryCustomizer(ObjectMapper mapper) {
return producerFactory -> producerFactory.setValueSerializer(new JsonSerializer<>(mapper));
}
@Bean
public NewTopic createTopic(TitanProperties titanProperties, KafkaProperties kafkaProperties) {
TitanProperties.Kafka kafka = titanProperties.getKafka();
String defaultTopic = kafkaProperties.getTemplate().getDefaultTopic();
int numPartitions = kafka.getNumPartitions();
short replicationFactor = kafka.getReplicationFactor();
log.atInfo()
.log("Creating Kafka Topic %s with %s partitions and %s replicationFactor", defaultTopic, numPartitions, replicationFactor);
return TopicBuilder.name(defaultTopic)
.partitions(numPartitions)
.replicas(replicationFactor)
.config(MESSAGE_TIMESTAMP_TYPE_CONFIG, LOG_APPEND_TIME.name)
.build();
}
}
和我的 application.yaml
spring:
application:
name: titan-producer
kafka:
client-id: ${spring.application.name}
producer:
key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
max.block.ms: 2000
request.timeout.ms: 2000
delivery.timeout.ms: 2000 #must be greater or equal to request.timeout.ms + linger.ms
template:
default-topic: titan-dev
我们的 Vault 配置使用 scheduleTask 执行 applicationContext.close()。我们这样做是随机的,因为我们有多个并行运行的应用程序副本,并避免同时杀死所有副本。
@Flogger
@Configuration
@ConditionalOnBean(SecretLeaseContainer.class)
@ConditionalOnProperty(
name = "titan.producer.scheduling.enabled",
havingValue = "true",
matchIfMissing = true)
public class VaultConfiguration {
@Bean
public Lifecycle scheduledAppRestart(Clock clock, TitanProperties properties, TaskScheduler scheduler, ConfigurableApplicationContext applicationContext) {
Instant now = clock.instant();
Duration maxTTL = properties.getVaultConfig().getCredsMaxLease();
Instant start = now.plusSeconds(maxTTL.dividedBy(2).toSeconds());
Instant end = now.plusSeconds(maxTTL.minus(properties.getVaultConfig().getCredsMaxLeaseExpirationThreshold()).toSeconds());
Instant randomInstant = randBetween(start, end);
return new ScheduledLifecycle(scheduler, applicationContext::close, "application restart before lease expiration", randomInstant);
}
private Instant randBetween(Instant startInclusive, Instant endExclusive) {
long startSeconds = startInclusive.getEpochSecond();
long endSeconds = endExclusive.getEpochSecond();
long random = RandomUtils.nextLong(startSeconds, endSeconds);
return Instant.ofEpochSecond(random);
}
}
我们用来运行计划任务的 ScheduledLifecycle 类
import lombok.extern.flogger.Flogger;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.TaskScheduler;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ScheduledFuture;
@Flogger
public class ScheduledLifecycle implements SmartLifecycle {
private ScheduledFuture<?> future = null;
private Duration delay = null;
private final TaskScheduler scheduler;
private final Runnable command;
private final String commandDesc;
private final Instant startTime;
public ScheduledLifecycle(TaskScheduler scheduler, Runnable command, String commandDesc, Instant startTime) {
this.scheduler = scheduler;
this.command = command;
this.commandDesc = commandDesc;
this.startTime = startTime;
}
public ScheduledLifecycle(TaskScheduler scheduler, Runnable command, String commandDesc, Instant startTime, Duration delay) {
this(scheduler, command, commandDesc, startTime);
this.delay = delay;
}
@Override
public void start() {
if (delay != null) {
log.atInfo().log("Scheduling %s: starting at %s, running every %s", commandDesc, startTime, delay);
future = scheduler.scheduleWithFixedDelay(command, startTime, delay);
} else {
log.atInfo().log("Scheduling %s: execution at %s", commandDesc, startTime);
future = scheduler.schedule(command, startTime);
}
}
@Override
public void stop() {
if (future != null) {
log.atInfo().log("Stop %s", commandDesc);
future.cancel(true);
}
}
@Override
public boolean isRunning() {
boolean running = future != null && (!future.isDone() && !future.isCancelled());
log.atFine().log("is %s running? %s", running);
return running;
}
}
spring-kafka 有错误吗?任何的想法?
谢谢
解决方案
future.cancel(true);
这会中断生产者线程,并且可能是问题的根本原因。
您应该使用future.cancel(false);
允许任务以有序的方式终止,而不会中断。
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
编辑
此外,ThreadPoolTaskScheduler.waitForTasksToCompleteOnShutdown
默认情况下为false。
/**
* Set whether to wait for scheduled tasks to complete on shutdown,
* not interrupting running tasks and executing all tasks in the queue.
* <p>Default is "false", shutting down immediately through interrupting
* ongoing tasks and clearing the queue. Switch this flag to "true" if you
* prefer fully completed tasks at the expense of a longer shutdown phase.
* <p>Note that Spring's container shutdown continues while ongoing tasks
* are being completed. If you want this executor to block and wait for the
* termination of tasks before the rest of the container continues to shut
* down - e.g. in order to keep up other resources that your tasks may need -,
* set the {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"}
* property instead of or in addition to this property.
* @see java.util.concurrent.ExecutorService#shutdown()
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
}
您可能还必须设置awaitTerminationSeconds
.
推荐阅读
- security - 如何修复 macOS Catalina 错误“未授权将苹果事件发送到终端”?
- sql - 创建一个不断返回新行的存储过程
- javascript - 将键/值合并到 response.data 内联?
- javafx - 使 tornadofx 组合框处理按键以选择项目
- mariadb - 如何使用第一列的值除以 2 创建一个新列
- python - 无法访问字典列表中的数据
- delphi - 如何发送通知?
- sed - 使用 sed 替换字符串和下一个
- javascript - 如何根据 Inline Eval DateTime Value 中的条件启用或禁用 Gridview 内的按钮
- git - 从 git head 克隆到单个默认工作区的最佳实践