spring - Spring Cloud Producer 异常 - java.lang.IllegalStateException: Producer 关闭后无法执行操作
问题描述
Spring Cloud Producer 抛出异常 - java.lang.IllegalStateException: Producer 关闭后无法执行操作
嗨,我们提供了一个基于 Spring Cloud 的微服务应用程序。在该应用程序中,我们在向 Kafka 生成消息时遇到错误。我们正在使用以下 Spring Cloud 版本。
春云版
格林威治.SR1
以下是 Kafka 生产者使用的接口定义
String INPUT = "jobmanager-in";
String OUTPUT_C = "collection-out";
String OUTPUT_P = "parser-out";
String OUTPUT_CMP = "compare-out";
String OUTPUT_R = "report-out";
String OUTPUT_N = "notification-out";
@Input(INPUT)
SubscribableChannel inboundJobManager();
@Output(OUTPUT_C)
MessageChannel outboundCollections();
@Output(OUTPUT_P)
MessageChannel outboundParse();
@Output(OUTPUT_CMP)
MessageChannel outboundCompare();
@Output(OUTPUT_R)
MessageChannel outboundReport();
@Output(OUTPUT_N)
MessageChannel outboundNotification();
生产者代码是
public void sendCollectionTask(final MessageT message) {
logger.info("Sending Collection Task :: " + message.toString());
MessageChannel messageChannel = collectionStream.outboundCollections();
boolean l_bool = messageChannel.send(MessageBuilder.withPayload(message)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
logger.info("After sending message for Collection Task send status :: " + message.getTask().getId() + " : "
+ l_bool);
}
当我们启动应用程序时,生产者会正确生成消息,但一段时间后我们会收到错误消息
2019-09-12 02:07:45,215 ERROR com.ericsson.tmo.cm.ccm.jobmanager.util.JobOrchestrator [scheduling-1] Error Trace ::
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@11328ab9]; nested exception is java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
如果我们重新启动应用程序,问题就解决了。谢谢你的帮助。
我们的 Spring Cloud 配置是
cloud:
stream:
kafka:
binder:
brokers: kafka
autoCreateTopic: false
configuration:
auto.offset.reset: latest
max.request.size: 16777216
buffer.memory: 67108864
bindings:
collection-out:
destination: collection
contentType: application/json
group: collection
producer:
partitionCount: 5
autoAddPartitions: true
解决方案
异常消息很清楚。
生产者关闭后无法执行操作
您的生产者可能会被一个线程关闭,而生产者正在调用的另一个线程send()
或生产者网络线程仍在发送消息。
由于您使用的是 Spring,我认为该KafkaProducer
对象是为您创建并通过依赖注入注入的。
您需要做的就是找出使用相同 KafkaProducer
对象的人以及close()
调用的位置。
可能的原因
有时,同一应用程序的其他几个组件可能会使用相同
KafkaProducer
的组件(如果您没有在多个生产者之间划定界限),并且其中一个可能已经关闭了它。更清楚地说,您可能正在使用相同的生产者 bean 实例为同一个实例中的不同组件生成消息,并且在某个它关闭的地方。
通常,我们有不同的生产者生产不同的主题,因为我们有不同的生产者配置和不同类型的数据的主题。
当我添加一个关闭钩子时,我遇到了类似的异常,
producer.close()
而另一个线程正在尝试生成该钩子。
推荐阅读
- kubernetes - 不在k8s中加载秘密
- javascript - 使用 npm 中的 react-gallery 组件从 flickr api url 渲染图像
- python - 打印特定组合
- java - Java中使用锁的生产者/消费者线程示例
- php - 如何在php中模拟POST表单提交?
- javascript - 异步函数不等待结束
- pyinstaller - 我正在尝试使用 pip 安装 Pyinstaller v 4.3,但出现未找到模块错误
- php - CKFinder tempDirectory 不能在 Linux 上运行?
- azure - Snowflake 中的外部 Azure 舞台
- php - 如何将 sendmail.php 的执行限制为同一服务器内的一页?