首页 > 解决方案 > Spring Cloud Stream - 本地消费者拦截消息并阻止其他人

问题描述

我正在尝试将应用程序消息传递spring cloud stream依赖于RabbitMQ活页夹。
我有一条一般消息\事件。一些应用程序可能是此类消息的来源,其中一些应用程序也是此类消息的消费者。
每个应用程序对该消息数据执行不同的逻辑。
例如,Application A

 @Bean
    public Consumer<Message<MyMessage>> onMyMessage() {
        return msg -> {
            // Get the contained message, log and return it
            MyActualMessage myActualMessage = msg.getPayload();
            log.debug("Received Cloud Event with some important data as POJO: " + myActualMessage toString());

            // Handle manual approval request has been decided
            if (myActualMessage.getOperationData() == OperationData.Charge) {
                // Do some logic related to this micro service...
            }
        };
    }

Application A也可以产生这样的事件:

public void sendDirectMessage(MyActualMessage message) {

        try {
            // Wrap the message in CloudEvent spec message
            Message<String> inputMessage = CloudEventMessageBuilder
                    .withData(objectMapper.writeValueAsString(message))
                    .setSource("https://spring.io/spring-event")
                    .setType("com.example.springevent")
                    .build();

            this.streamBridge.send("onMyMessage-in-0", inputMessage);
        }
        catch(Exception ex){
            throw new MyMessagingException(String.format("Failed to send a a message: %s", message.toString()), ex);
        }
    }

其他应用程序也是如此。 当我发送一条消息时,我希望它能够到达所有消费者,但它只发送给本地消费者......(本地消费者可以通过检查发件人字段来忽略它,如果它不会被过滤也不是问题)。
我试图为每个应用程序提供不同的消费者组名称(也许我的定义是错误的?)无济于事......

spring:
  cloud:
    function:
      definition: onMyMessage
    stream:
      bindings:
        input:
          group: ApplicationA

(每个应用程序的组名都不同)。 知道可能是什么问题,我该如何克服它?

标签: spring-cloud-stream

解决方案


我认为您对生产者和消费者的绑定名称感到一团糟。

首先查看https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_functional_binding_names上的绑定名称约定

输入 - <functionName> + -in- + <index>
输出 - <functionName> + -out- + <index>

话说回来。您应该按如下方式调用 streamBridge:

this.streamBridge.send("onMyMessage-out-0", inputMessage);

你的绑定配置应该是这样的:

spring:
  cloud:
    function:
      definition: onMyMessage
    stream:
      bindings:
        onMyMessage-out-0:
          destination: my-topic
        onMyMessage-in-0:
          destination: my-topic
          group: ApplicationA

推荐阅读