error-handling - 应用程序运行时异常未发送到 errorChannel 或 ServiceActivator 无法侦听 errorChannel
问题描述
在使用 @StreamListener 监听 kafka 主题后,在 RuntimeException、全局 erroChannel 或主题特定的 errorChannel (topic.group.errors) 未收到任何错误消息。@ServiceActivator 没有收到任何东西。
POM 依赖项:Greenwich.RELEASE
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
应用程序属性
spring.cloud.stream.bindings.input.destination=input
spring.cloud.stream.bindings.input.group=myGroup
spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName=input_deadletter
spring.cloud.stream.kafka.streams.bindings.input.consumer.autoCommitOnError=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.content-Type=application/*+avro
spring.cloud.stream.bindings.output.producer.useNativeEncoding=true
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.schemaRegistryClient.endpoint.schema.avro.schema-locations=classpath:avro/*.avsc
spring.cloud.stream.kafka.streams.binder.brokers=localhost
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url=http://localhost:8082
spring.cloud.stream.kafka.streams.binder.application-id=myGroup
spring.cloud.stream.kafka.streams.binder.serdeError=sendtodlq
我可以在日志中看到服务激活器已注册并订阅了错误频道。一旦发生运行时异常,所有流都会停止并进入关闭模式。
Registering beans for JMX exposure on startup
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel input.myGroup.errors
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name="input-myGroup.errors"': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name="input.myGroup.errors"] org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel errorChannel
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name=errorChannel': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name=errorChannel]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel nullChannel
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name=nullChannel': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name=nullChannel]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler errorLogger
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=errorLogger,bean=internal': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=errorLogger,bean=internal]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler myTopicListener.error.serviceActivator
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=myTopicListener.error.serviceActivator,bean=endpoint': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=myTopicListener.error.serviceActivator,bean=endpoint]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler myTopicListener.errorGlobal.serviceActivator
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=myTopicListener.errorGlobal.serviceActivator,bean=endpoint': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=myTopicListener.errorGlobal.serviceActivator,bean=endpoint]
org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor - No @KafkaListener annotations found on bean type: class org.springf
@SendTo(MyStreams.OUTPUT)
public KStream<Key, MyEntity> process(KStream<Key, Envelope> myStreamObject) {
return myStreamObject.mapValues(this::transform);
}
@ServiceActivator(inputChannel = "input.myGroup.errors") //channel name 'input.myGroup.errors'
public void error(Message<?> message) {
System.out.println("Handling ERROR: " + message);
}
@ServiceActivator(inputChannel = "errorChannel")
public void errorGlobal(Message<?> message) {
System.out.println("Handling ERROR: GLOBAL " + message);
}
解决方案
kafka流绑定器不是基于MessageChannel
s 的,因此没有Message<?>
发送到错误通道。
标准的 kafka binder 是一个MessageChannelBinder
并支持错误通道。
使用 Kafka Streams,您必须实现自己的错误处理。
推荐阅读
- android - 通过程序操作除 main_xml 之外的 xml
- selenium - 等待执行两个动作 - selenium/java
- python - 找不到请求的 Boost 库
- vim - 如何在 Vim 中更改运算符突出显示颜色?
- certificate - 创建知道数字签名密钥的.key文件
- python - 在 OpenCV 中跳过帧并寻找到 RTSP 流的结尾
- ios - 如何为 CATextLayer 的字符串设置动画?
- python - Cython 编译器错误
- php - Cakephp - 无法提交事务 - 回滚()
- apache-spark - 在 Apache Spark SQL 中使用 collect_set(struct