java - Spring Cloud Stream (Kafka) 参数化指定错误通道 {destination}.{group}.errors
问题描述
我正在尝试查看我传递给 @ServiceActivator 的错误通道是否可以通过引用 YAML 中指定的值来限制/参数化,而不是在代码本身中硬编码实际目的地和使用者组。
@ServiceActivator(
// I do not want to hardcode destination and consumer group here
inputChannel = "stream-test-topic.my-consumer-group.errors"
)
public void handleError(ErrorMessage errorMessage) {
// Getting exception objects
Throwable errorMessagePayload = errorMessage.getPayload();
log.error("exception occurred", errorMessagePayload);
// Get message body
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
log.error("Message Body: {}", originalMessage.getPayload());
} else {
log.error("The message body is empty");
}
}
解决方案
你不能用@ServiceActivator
; 改用 Java DSL:
@Value("${error.channel}")
String errors;
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(this.errors)
.handle(msg -> {
System.out.println(msg);
})
.get();
}
并设置
error:
channel: stream-test-topic.my-consumer-group.errors
推荐阅读
- java - 我的应用不再要求用户允许写入外部存储,为什么?
- android - Android:在部署的应用程序中找不到资源异常
- java - 如何将编辑文本值添加到列表视图
- node.js - 有人可以在 try and catch 期间获得错误状态吗?节点.js
- php - Php - 如何在不设置路由的情况下接收简单的帖子正文?
- javascript - 如何在 Nuxt 中动态注入样式?
- vmware - 在 PowerCLI 中将主机名添加到 Get-Datastore 命令的输出
- llvm - 如何让 ScalarEvolution 重新计算 SCEV 值?
- docker - 错误:Kubernetes 集群中的 Heartbeat 日志中没有这样的主机
- python - ValueError:无法将字符串转换为浮点数:'HH_Income'