首页 > 解决方案 > Spring Cloud Stream (Kafka) 参数化指定错误通道 {destination}.{group}.errors

问题描述

我正在尝试查看我传递给 @ServiceActivator 的错误通道是否可以通过引用 YAML 中指定的值来限制/参数化,而不是在代码本身中硬编码实际目的地和使用者组。

@ServiceActivator(
        // I do not want to hardcode destination and consumer group here
        inputChannel = "stream-test-topic.my-consumer-group.errors"
    )
    public void handleError(ErrorMessage errorMessage) {
        // Getting exception objects
        Throwable errorMessagePayload = errorMessage.getPayload();
        log.error("exception occurred", errorMessagePayload);

        // Get message body
        Message<?> originalMessage = errorMessage.getOriginalMessage();
        if (originalMessage != null) {
            log.error("Message Body: {}", originalMessage.getPayload());
        } else {
            log.error("The message body is empty");
        }
    }

标签: javaspringkafka-consumer-apispring-kafkaspring-cloud-stream

解决方案


你不能用@ServiceActivator; 改用 Java DSL:

@Value("${error.channel}")
String errors;

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(this.errors)
            .handle(msg -> {
                System.out.println(msg);
            })
            .get();
}

并设置

error:
  channel: stream-test-topic.my-consumer-group.errors

推荐阅读