首页 > 技术文章 > 利用rabbitmq 实现消息的延迟发送

zhangchenglzhao 2019-08-26 14:52 原文

需求: 实现消息的延迟通知,每5s, 30s,60s,120s 通知一次。 就是每隔一段时间执行一次方法,该方法做业务上的处理。

网上查rabbitmq原生是不支持延迟消息的。(rocketmq 支持), 但是可以换种方式实现: 利用其死信队列。

rabbitmq的队列或消息可以设置过期时间,过期后会将消息放入你设置的队列中,如

<rabbit:queue name="notify.use.delay" durable="true" auto-delete="false" exclusive="false">
        <rabbit:queue-arguments>
            <!-- 消息过期根据重新路由 -->
            <entry key="x-dead-letter-exchange" value="notifyExchange"/>
            <entry key="x-dead-letter-routing-key" value="notify.use.active"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!-- 定义direct exchange,绑定queue -->
    <rabbit:direct-exchange name="notifyExchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="notify.use.delay" key="notify.use.delay"></rabbit:binding>
            <rabbit:binding queue="notify.use.active" key="notify.use.active"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <bean id="notifyListener" class="com.shdy.job.NotifyListener"/>
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="notify.use.active" ref="notifyListener"/>
    </rabbit:listener-container>

消息过期后自动转入 notify.use.active 队列中。 然后设置一个监听,消费该队列既可以实现。

//模拟发送消息type的格式是 A_S.5 ,  其中S 表示通知成功,通知成功就不在加入下个队列了,5 表示5s时间。 
@Resource
    private AmqpTemplate amqpTemplate;
    @ResponseBody
    @RequestMapping(value = "/xx", method = RequestMethod.GET,produces = "application/json;charset=UTF-8")
    public String xx(String type){
        String[] x = type.split("\\.");
        amqpTemplate.convertAndSend("notifyExchange","notify.use.delay",type,message -> {
            MessageProperties messageProperties = message.getMessageProperties();
            messageProperties.setExpiration(Integer.parseInt(x[1])*1000 + "");
            return message;
        });

        return "success";
    }


public class NotifyListener implements MessageListener {
    @Resource
    private AmqpTemplate amqpTemplate;
    @Override
    public void onMessage(Message m) {
        String type = new String(m.getBody());
        System.out.println("-------------------------------------------"+m);
        if(!type.contains("S")){
            String[] x = type.split("\\.");
        // 模拟如果通知不成功就将过期时间乘以2(根据自己业务变动),再次放入延迟队列中。 amqpTemplate.convertAndSend(
"notifyExchange","notify.use.delay"+Integer.parseInt(x[1])*2,type,message -> { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setExpiration(Integer.parseInt(x[1])*1000*2 + ""); //细节:使用该方法需spring-core4.2.6以上 return message; }); } } }

注意: 但是测试发现一个问题: 10s 消息比后来加入的5s 的消息先通知。网上查到原因是: 只有到达队列顶部的消息才会去验证队列过期时间,

因为10s 的消息是先加入的,所以在顶部,等待10s 到期后才执行,所以5s 反而在后面执行。 

解决方式是定义多个延迟队列,每个队列只放一种过期时间的消息。 如 

notify.use.delay_0(延迟5s),notify.use.delay_1(延迟30s),notify.use.delay_2 . 

在存放的时候可以取出当前消息的延迟时间,如1 ,然后加1, 放入下一个队列, 这时候发送的消息类如 : A_S.0 , B_O.0 。
int kk = Integer.parseInt(x[1])+1
amqpTemplate.convertAndSend("notifyExchange","notify.use.delay_"+kk,x[0]+"."+kk)

整个队列的过期时间可以直接设置,不用每个消息单独设置:
<rabbit:queue name="notify.use.delay" durable="true" auto-delete="false" exclusive="false">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <!-- 队列默认消息过期时间 -->
                <value type="java.lang.Long">5000</value>
            </entry>
            <!-- 消息过期根据重新路由 -->
            <entry key="x-dead-letter-exchange" value="notifyExchange"/>
            <entry key="x-dead-letter-routing-key" value="notify.use.active"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

 

 

 

推荐阅读