spring-cloud-stream - Spring Cloud Stream - 本地消费者拦截消息并阻止其他人
问题描述
我正在尝试将应用程序消息传递spring cloud stream
依赖于RabbitMQ
活页夹。
我有一条一般消息\事件。一些应用程序可能是此类消息的来源,其中一些应用程序也是此类消息的消费者。
每个应用程序对该消息数据执行不同的逻辑。
例如,Application A
:
@Bean
public Consumer<Message<MyMessage>> onMyMessage() {
return msg -> {
// Get the contained message, log and return it
MyActualMessage myActualMessage = msg.getPayload();
log.debug("Received Cloud Event with some important data as POJO: " + myActualMessage toString());
// Handle manual approval request has been decided
if (myActualMessage.getOperationData() == OperationData.Charge) {
// Do some logic related to this micro service...
}
};
}
但Application A
也可以产生这样的事件:
public void sendDirectMessage(MyActualMessage message) {
try {
// Wrap the message in CloudEvent spec message
Message<String> inputMessage = CloudEventMessageBuilder
.withData(objectMapper.writeValueAsString(message))
.setSource("https://spring.io/spring-event")
.setType("com.example.springevent")
.build();
this.streamBridge.send("onMyMessage-in-0", inputMessage);
}
catch(Exception ex){
throw new MyMessagingException(String.format("Failed to send a a message: %s", message.toString()), ex);
}
}
其他应用程序也是如此。
当我发送一条消息时,我希望它能够到达所有消费者,但它只发送给本地消费者......(本地消费者可以通过检查发件人字段来忽略它,如果它不会被过滤也不是问题)。
我试图为每个应用程序提供不同的消费者组名称(也许我的定义是错误的?)无济于事......
spring:
cloud:
function:
definition: onMyMessage
stream:
bindings:
input:
group: ApplicationA
(每个应用程序的组名都不同)。 知道可能是什么问题,我该如何克服它?
解决方案
我认为您对生产者和消费者的绑定名称感到一团糟。
输入 - <functionName> + -in- + <index>
输出 - <functionName> + -out- + <index>
话说回来。您应该按如下方式调用 streamBridge:
this.streamBridge.send("onMyMessage-out-0", inputMessage);
你的绑定配置应该是这样的:
spring:
cloud:
function:
definition: onMyMessage
stream:
bindings:
onMyMessage-out-0:
destination: my-topic
onMyMessage-in-0:
destination: my-topic
group: ApplicationA
推荐阅读
- c# - FFImageLoading 在 Android 中引发异常并出现错误:System.TypeLoadException:由于找不到方法,无法加载方法覆盖列表:
- python - python StartPage 实例没有属性's'
- java - 使用 java 静态错误读取 Json
- bash - 在 Travis CI 上激活 conda
- wordpress - wp_enqueue_style() 不会消失
- wso2 - 使用 WSO2SP 时,如何在 siddhi 中使用 siddhi-io-csv 扩展时将原始文件名(输入文件)注入到定义的流中
- excel - 带有变量和空格的 VBA Shell 命令
- c# - 重新定位后 Oculus Go 的拍摄问题
- c# - AddRange 导致列表的内容被更改
- python - 更改列表中每个变量的变量值