首页 > 技术文章 > SpringCloud Stream整合RabbitMQ3.5.0

zgq7 2020-11-02 18:10 原文

前言

点击进入Spring官网文档

本文章为单体项目,将消费者和生产者写在同一个项目中,介意者不用向下看了。

本文介绍三种应用方式:

1:普通整合RabbitMQ

2:消息分区

3:按条件消费(多个消费者只消费同一队列中满足自己条件的消息)

1:核心依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    <version>${spring.cloud.stream}</version>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <version>${spring.cloud.stream}</version>
</dependency>

全部依赖:
drawing

项目目录图:
drawing

2:基础版整合RabbitMQ

①:application.properties

spring.rabbitmq.host=192.168.1.218
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.cloud.stream.bindings.dev-exchange.destination=dev-exchange
spring.cloud.stream.bindings.dev-exchange.group=dev-queue
spring.cloud.stream.bindings.dev-exchange.content-type=application/json
spring.cloud.stream.bindings.dev-exchange.consumer.concurrency=1
spring.cloud.stream.bindings.dev-exchange.consumer.max-attempts=1

②:定义生产者和消费者接口

import com.boot.rabbitmq.constance.MQConstants;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface RabbitStream {

    /**
     * 消息流入(消费)
     **/
    @Input(MQConstants.DEV_EXCHANGE)
    SubscribableChannel devConsumer();

    /**
     * 消息流出(生产)
     **/
    @Output(MQConstants.DEV_EXCHANGE)
    MessageChannel devProducer();
}

③:生产者代码:

@Component
@EnableBinding(RabbitStream.class)
public class DevProducer {

    private static final Logger logger = LoggerFactory.getLogger(DevProducer.class);

    private final RabbitStream rabbitStream;

    public DevProducer(RabbitStream rabbitStream) {
        this.rabbitStream = rabbitStream;
    }

    public void sendMsg(MQModel model) {
        logger.info("producer:{}", JSON.toJSONString(model));
        rabbitStream.devProducer()
                .send(MessageBuilder.withPayload(model).build());
    }
}

④:费者代码:

@Component
@EnableBinding(RabbitStream.class)
public class DevListener {

    private static final Logger logger = LoggerFactory.getLogger(DevListener.class);

    @StreamListener(MQConstants.DEV_EXCHANGE)
    public void receiveMsgAutoCommit(@Payload String payload) {
        logger.info("consumer:{}", payload);
    }

}

⑥:controller代码:

    @PostMapping(value = "/dev")
    public void dev(@RequestBody MQModel model) {
        devProducer.sendMsg(model);
    }

⑦:测试

发送请求:
drawing

控制台日志:
drawing

3:消息分区

3.1:概念

RabbitMQ 本身是不支持消息分区的,只是由于业务演变+代码控制的一种方案而已(参考spring官方开头文档理解)。
个人理解:所谓消息分区就是将一个大队列拆分 0、1...n 个小队列,
然后分解成 producer-A -> queue-A -> Consumer-A 的一种场景。

drawing

3.2:如何在项目中使用

①:配置
不需要改很多东西,只需要添加少部分配置即可

## RabbitMQ 消息分区配置
spring.cloud.stream.bindings.partition-exchange.destination=partition-exchange
spring.cloud.stream.bindings.partition-exchange.group=partition-queue
spring.cloud.stream.bindings.partition-exchange.content-type=application/json
spring.cloud.stream.bindings.partition-exchange.consumer.concurrency=1
spring.cloud.stream.bindings.partition-exchange.consumer.max-attempts=1
## 消息分区
spring.cloud.stream.bindings.partition-exchange.consumer.partitioned=true
## 分区数量
spring.cloud.stream.bindings.partition-exchange.producer.partition-count=2
## 机器下标,最大值=partition-count-1
spring.cloud.stream.instance-index=0
## 分区策略表达式
spring.cloud.stream.bindings.partition-exchange.producer.partition-key-expression=payload.mid

②:路由规则
然后消息的路由的时候会从payload拿到mid进行条件运算:
mid/2=1则放在应用队列下标为1的队列,mid/2=0则放在队列下标为0的队列。

drawing

③:源码截图
消息的入队前会计算出该消息应该进入哪个队列↓↓↓↓↓↓↓↓↓↓

drawing

可以看到开启分区之后,payload 的类型不是String,而是具备键值对的实体对象。

4:条件消费

4.1:概念

前面说过,Message 是由消息头和消息体组成的。因此可以在发送消息的时候自定义一个key存放在消息头,消费者可以根据自己的消费条件进行消费。
对同一个队列中的消息按条件进行划分再派发给不同的消费者。我的示例就是在header 中设置了一个key。

drawing

4.2:匹配条件讲解

除了可以用 MessageHeader 中的数据进行匹配条件外,payload(消息体)中的数据也可以作为条件。

消息实体结构:

drawing

4.3:测试

代码截图↓↓↓↓↓↓↓↓↓↓↓
drawing


效果↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
drawing

本文GitHub地址
个人理解,不精之处望指出。

推荐阅读