首页 > 技术文章 > php rabbitmq的开发体验(二)

xia-na 2021-05-18 16:35 原文

一、前言

在上一篇rabbitmq开发体验,我们大致介绍和安装上了rabbitmq和php扩展和界面操作rabbitmq的方法,下面正是正式的用我们php来操作消息队列的生产和消费。附上参考的网站:

二、开发经历

对于rabbitmq的php类库,我开发是使用PHP amqplib,composer解决依赖管理。

添加composer.json:

{
    "require": {
        "php-amqplib/php-amqplib": ">=2.6.1"
    }
}
composer install

# 或者 直接运行包引入
composer require php-amqplib/php-amqplib

我的开发框架是yii1.1,核心代码如下,有错误请指正。

1.rabbitmq的连接底层类
<?php

include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
 * rabbitmq工具类
 */
class RabbitMq
{
    protected $connection;
    protected $channel;
    protected $exchange_name;
    protected $query_name;
    protected $route_key_name;

    /**
     * 构造器
     */
    public function __construct()
    {
        //读取文件会导致并发性高时连接失败故写在配置文件
        $config = $GLOBALS['rabbitmq_config'];

        if (!$config)
            throw new \AMQPConnectionException('config error!');

        $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password'], $config['vhost']);
        if (!$this->connection) {
            throw \AMQPConnectionException("Cannot connect to the broker!\n");
        }
        $this->channel = $this->connection->channel();
    }

    /**
     * 日志写入
     * @param $file log文件路径
     * @param $dataStr 报错字符串
     */
    protected function writeLog($file,$dataStr)
    {
        file_put_contents(ROOT_PATH.$file, date('Y-m-d H:i:s').'    '.$dataStr .PHP_EOL, FILE_APPEND);
    }

    /**
     * close link
     */
    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }

    /**
     * RabbitMQ destructor
     */
    public function __destruct()
    {
        $this->close();
    }

}

   2.消息的封装类

 

<?php

include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class SubMessage
{
    public $message;
    private $routingKey;
    private $params;

    /**
     * SubMessage constructor.
     *
     * @param AMQPMessage $message
     * @param string      $routingKey
     * @param array       $params
     */
    public function __construct(AMQPMessage $message, $routingKey, $params = [])
    {
        $this->params = $params; //额外的参数这里主要存储重试的次数
        $this->message = $message;

        $this->routingKey = $routingKey;
    }

    /**
     * Get AMQP Message
     *
     * @return AMQPMessage
     */
    public function getAMQPMessage()
    {
        return $this->message;
    }

    /**
     * Get original Message
     *
     * @return Message
     */
    public function getMessage()
    {
        return $this->message->body;
    }

    /**
     * Get meta params
     *
     * @return array
     */
    public function getParams()
    {
        return is_array($this->params) ? $this->params : [];
    }

    /**
     * Get meta param
     *
     * @param string $key
     *
     * @return mixed|null
     */
    public function getParam(string $key)
    {
        return isset($this->params[$key]) ? $this->params[$key] : null;
    }

    /**
     * Get routing key
     *
     * @return string
     */
    public function getRoutingKey()
    {
        return $this->routingKey;
    }


}

 

3.消息的核心类

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
/**
 * vm独立站推送
 * 简介Rabbitmq的几种消费模式  https://www.jianshu.com/p/7ac733b7481b
 */
class VmMq extends RabbitMq
{
    protected $exchange_name = 'master'; //主Exchange,发布消息时发布到该Exchange
    protected $exchange_retry_name = 'master.retry'; //重试Exchange,消息处理失败时(3次以内),将消息重新投递给该Exchange
    protected $exchange_failed_name = 'master.failed'; //失败Exchange,超过三次重试失败后,消息投递到该Exchange
    protected $query_name = 'query_vm'; //消费服务需要declare三个队列[queue_name] 队列名称,格式符合 [服务名称]@订阅服务标识
    protected $query_retry_name = 'query_vm@retry';
    protected $query_failed_name = 'query_vm@fail';
    protected $route_key_name = 'route_key_vm'; //路由键名
    /**
     * 构造器
     */
    public function __construct()
    {
        parent::__construct();

        //第2个参数:rabbitmq将给一个消费者一次只发布十条消息。或者说,只有收到消费者上十个消息的完成应答,才给它发布新的消息。如果消费者很忙,消费过慢,队列可能会被填满,这时你需要增加消费者,或者使用其他策略。
        $this->channel->basic_qos(null, 100, false);
        /*
         * 第1个参数:交换机名
         * 第2个参数:声明topic类型交换器
         * 第3个参数:passive消极的,如果该交换机已存在,则不会创建;如果不存在,创建新的交换机。
         * 第4个参数:durable持久的,指定交换机持久
         * 第5个参数:auto_delete,通道关闭后是否删除交换机,自删除的前提是以前有队列连接这个交换器,后来所有与这个交换器绑定的队列或者交换器都与此解绑,
         * Topic交换器非常强大,可以像其他类型的交换器一样工作:
        * 当一个队列的绑定键是"#"是,它将会接收所有的消息,而不再考虑所接收消息的路由键,就像是fanout发布与订阅交换器一样;
        * 当一个队列的绑定键没有用到”#“和”*“时,它又像direct交换一样工作。
         * routing-key是模糊匹配,*可以只替换一个单词,#可以代替零个或多个单词。
         * https://www.cnblogs.com/wuhenzhidu/p/10802749.html
        */
        $this->channel->exchange_declare($this->exchange_name, 'topic', false, true, false);
        $this->channel->exchange_declare($this->exchange_retry_name, 'topic', false, true, false);
        $this->channel->exchange_declare($this->exchange_failed_name, 'topic', false, true, false);
    }

    /**
     * 生产消息
     */
    public function product($data,$priority = 10){
        $unique_messageId = $this->create_guid(); //生成消息的唯一标识,用来幂等性
        if(!is_array($data)){
            $data = array('msg' => $data);
        }
        $uid=isset($data['uid'])?$data['uid']:0;
        $langid = isset($data['langid'])?$data['langid']:1;
        $data['unique_messageId'] = $unique_messageId;
        $data = json_encode($data,JSON_UNESCAPED_UNICODE);

        //存入到表中,保证生产的消息100%到mq队列
        $newModel = DynamicAR::model('nt_vm_message_idempotent');
        $newModel->message_id = $unique_messageId;
        $newModel->uid = $uid;
        $newModel->message_content = $data;
        $newModel->product_status = 0;
        $newModel->consume_status = 0;
        $newModel->create_time = $newModel->update_time = time();
        $newModel->langid = $langid;
        $newModel->priority = $priority;
        $newModel->isNewRecord = true;
        if(!$newModel->save()){
            $this->writeLog('runtime/vm_product_failed.log','数据库保存失败' . json_encode($newModel->getErrors()).$data);
            return;
        }
        //推送成功的ack回调
        $this->channel->set_ack_handler(
            function(AMQPMessage $msg){
                $msgBody = json_decode($msg->getBody(),true);
                if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){
                    $this->writeLog('runtime/vm_product_failed.log','获取消费ID为空!' . $msg->getBody());
                    return;
                }
                $unique_messageId = $msgBody['unique_messageId'];
                $criteria = new CDbCriteria;
                $criteria->addCondition("message_id = '".$unique_messageId."'");
                $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria);
                if (!$messageIdempotent) {
                    $this->writeLog('runtime/vm_product_failed.log','该消息数据库里不存在' . $msg->getBody());
                    return;
                }else{
                    $connection = Yii::app()->db;
                    $command = $connection->createCommand("
                        UPDATE nt_vm_message_idempotent SET product_status=1 WHERE message_id = '$unique_messageId'
                    ");
                    $re = $command->execute();
                    if($re) {
                        $this->writeLog('runtime/vm_product_log.log',$messageIdempotent->message_id . $msg->getBody());
                        return;
                    }else{
                        $this->writeLog('runtime/vm_product_failed.log','数据库保存失败' . $msg->getBody());
                        return;
                    }
                }
            }
        );
        //推送失败的nack回调
        $this->channel->set_nack_handler(
            function(AMQPMessage $message){
                $this->writeLog('runtime/vm_product_failed.log',"消息生产到mq nack ".$message->body);
            }
        );
        //监听交换机或者路由键是否存在
        $returnListener = function (
            $replyCode,
            $replyText,
            $exchange,
            $routingKey,
            $message
        ) {
            $this->writeLog('runtime/vm.log','replyCode ='.$replyCode.';replyText='.$replyText.';exchange='.$exchange.';routingKey='.$routingKey.';body='.$message->body);
        };
        //开启发送消息的return机制
        $this->channel->set_return_listener($returnListener);
        //开启发送消息的ack回调
        $this->channel->confirm_select();

        $msg = new AMQPMessage($data,array(
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, //设置消息持久化
            'priority' => $priority, //消息的优先级 优先级越大越优先
        ));
        $msg->set('application_headers', new AMQPTable([]));
        //推送消息到某个交换机,第三个是路由键为方便即是队列名
        $this->channel->basic_publish($msg, $this->exchange_name,$this->query_name,true);
        //等待发送消息的ack回调消息
        $this->channel->wait_for_pending_acks();
        $this->close();
    }

    protected function create_guid($namespace = '') {
        static $guid = '';
        $uid = uniqid("", true);
        $data = $namespace;
        $data .= $_SERVER['REQUEST_TIME'];
        $data .= isset($_SERVER['HTTP_USER_AGENT'])?$_SERVER['HTTP_USER_AGENT']:'';
        $data .= isset($_SERVER['SERVER_ADDR'])?$_SERVER['SERVER_ADDR']:'';
        $data .= isset($_SERVER['SERVER_PORT'])?$_SERVER['SERVER_PORT']:'';
        $data .= isset($_SERVER['REMOTE_ADDR'])?$_SERVER['REMOTE_ADDR']:'';
        $data .= isset($_SERVER['REMOTE_PORT'])?$_SERVER['REMOTE_PORT']:'';
        $hash = strtoupper(hash('ripemd128', $uid . $guid . md5($data)));
        $guid = '{' .
            substr($hash, 0, 8) .
            '-' .
            substr($hash, 8, 4) .
            '-' .
            substr($hash, 12, 4) .
            '-' .
            substr($hash, 16, 4) .
            '-' .
            substr($hash, 20, 12) .
            '}';
        return $guid;
    }


    /**
     * 消费消息
     */
    public function consume(\Closure $callback,\Closure $shouldExitCallback = null,$priority = 5){
        $this->declareRetryQueue();
        $this->declareConsumeQueue($priority);
        $this->declareFailedQueue();
        //执行上面的步骤主要是为保证这些目标交换机和队列已经存在

        $queueName = $this->query_name;
        $exchangeRetryName = $this->exchange_retry_name;
        $exchangeFailedName = $this->exchange_failed_name;
        // 发起延时重试的回调
        $publishRetry = function ($msg) use ($queueName,$exchangeRetryName) {

            /** @var AMQPTable $headers */
            if ($msg->has('application_headers')) {
                $headers = $msg->get('application_headers');
            } else {
                $headers = new AMQPTable();
            }

            $headers->set('x-orig-routing-key', $this->getOrigRoutingKey($msg));

            $properties = $msg->get_properties();
            $properties['application_headers'] = $headers;
            $newMsg = new AMQPMessage($msg->getBody(), $properties);

            $this->channel->basic_publish(
                $newMsg,
                $exchangeRetryName,
                $queueName
            );
            // 发送ack信息应答当前消息处理完成
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        // 将消息发送到失败队列的回调
        $publishFailed = function ($msg) use ($queueName,$exchangeFailedName) {
            $this->channel->basic_publish(
                $msg,
                $exchangeFailedName,
                $queueName
            );
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        $this->channel->basic_consume(
            $this->query_name,
            '',     //customer_tag 消费者标签,用来区分多个消费者
            false,  //no_local 若设置为true,表示不能将同一个Conenction中生产者发送的消息传递给这个Connection中的消费者
            false,  //no_ack 是否自动确认消息,true自动确认,false不自动要消费脚本手动调用ack,避免消费异常系统反而自动ack完成
            false,   //exclusive 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接
            false,  //nowait
            // 消费主回调函数
            function(AMQPMessage $msg) use ($callback, $publishRetry, $publishFailed) {
                $retry = $this->getRetryCount($msg);
                try{
                    /*
                     * 需要注意的是:在消费消息之前,先获取消息ID,然后根据ID去数据库中查询是否存在主键为消息ID的记录,如果存在的话,
                     * 说明这条消息之前应该是已经被消费过了,那么就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成发送确认消息,
                     * 并且将消息记录进行入库。
                     */
                    $msgBody = json_decode($msg->getBody(),true);
                    if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){
                        $this->writeLog('runtime/vm_consume_failed.log','获取消费ID为空!' . $msg->getBody());
                        //发送ack信息应答当前消息处理完成
                        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                        return;
                    }
                    $unique_messageId = $msgBody['unique_messageId'];
                    $criteria = new CDbCriteria;
                    $criteria->addCondition("message_id = '".$unique_messageId."'");
                    $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria);
                    if ($messageIdempotent) {
                        if($messageIdempotent->consume_status == 0){
                            //如果找不到,则进行消费此消息
                            $callback($msg, $publishRetry, $publishFailed);
                        }else{
                            //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;
                            $this->writeLog('runtime/vm_consume_failed.log','该消息已消费,无须重复消费!' . $msg->getBody());
                            //发送ack信息应答当前消息处理完成
                            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                            return;
                        }
                    } else {
                        //插入太快
                        $this->writeLog('runtime/vm_consume_failed.log','插入太快'. $msg->getBody());
                        $this->retryFail($retry,$msg, $publishRetry, $publishFailed);
                    }
                }catch (Exception $e){
                    $this->writeLog('runtime/vm_exception.log',$e->getMessage());
                    $this->retryFail($retry,$msg, $publishRetry, $publishFailed);
                }

            }
        );
        //监听通道消息 快递员看有没有信,有就立马寄
        while (count($this->channel->callbacks)) {

            if ($shouldExitCallback()) {
                return;
            }

            try {
                $this->channel->wait();
            } catch (AMQPTimeoutException $e) {
            } catch (AMQPIOWaitException $e) {
            }
        }


        $this->close();
    }


    /**
     * 重试失败的消息
     * 注意: 该方法会堵塞执行
     * @param \Closure $callback 回调函数,可以为空,返回true则重新发布,false则丢弃
     */
    public function retryFailed($callback = null)
    {
        $this->declareConsumeQueue();
        $this->declareFailedQueue();

        $queueName = $this->query_name;
        $exchangeName = $this->exchange_name;
        $this->channel->basic_consume(
            $this->query_failed_name,
            '',     //customer_tag
            false,  //no_local
            false,  //no_ack
            true,   //exclusive
            false,  //nowait
            function ($msg) use ($queueName, $exchangeName, $callback) {
                if (is_null($callback) || $callback($msg)) {
                    // 重置header中的x-orig-routing-key属性
                    $msg->set('application_headers', new AMQPTable([
                        'x-orig-routing-key' => $this->getOrigRoutingKey($msg),
                    ]));
                    $this->channel->basic_publish(
                        $msg,
                        $exchangeName,
                        $queueName
                    );
                }

                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            }
        );
        while (count($this->channel->callbacks)) {
            try {
                $this->channel->wait();
            } catch (AMQPTimeoutException $e) {
                return;
            } catch (AMQPIOWaitException $e) {
            }
        }
    }

    /**
     * 获取绑定queue与exchange时的routingkey
     */
    protected function getOrigRoutingKey($msg){
        $retry = null;
        if ($msg->has('application_headers')) {
            $headers = $msg->get('application_headers')->getNativeData();
            if (isset($headers['x-orig-routing-key'])) {
                $retry = $headers['x-orig-routing-key'];
            }
        }
        return $retry?$retry:$msg->get('routing_key');
    }

    /**
     * 获取消息重试次数
     * @param AMQPMessage $msg
     * @return int
     */
    protected function getRetryCount($msg)
    {
        $retry = 0;
        if ($msg->has('application_headers')) {
            $headers = $msg->get('application_headers')->getNativeData();
            if (isset($headers['x-death'][0]['count'])) {
                $retry = $headers['x-death'][0]['count'];
            }
        }

        return (int)$retry;
    }

    /**
     * 消息重试
     */
    public function retryFail($retry,AMQPMessage $msg, $publishRetry, $publishFailed){
        if ($retry >= 3) {
            // 超过最大重试次数,消息无法处理
            $publishFailed($msg);
            return;
        }

        // 消息处理失败,稍后重试
        $publishRetry($msg);
        return;
    }

    /**
     * 声明重试队列
     */
    private function declareRetryQueue()
    {
        $this->channel->queue_declare($this->query_retry_name, false, true, false, false, false,new AMQPTable(array(
            'x-dead-letter-exchange' => $this->exchange_name,
            'x-dead-letter-routing-key' => $this->query_name,
            'x-message-ttl'          => 3 * 1000,
        )));
        $this->channel->queue_bind($this->query_retry_name, $this->exchange_retry_name, $this->query_name);
    }

    /**
     * 声明消费队列
     * @param $priority 消息队列优先级 暂时取个中间值 1-10
     */
    private function declareConsumeQueue($priority = 5)
    {
        //声明队列
        $this->channel->queue_declare(
            $this->query_name,  //队列名称
            false,     //passive消极的,如果该队列已存在,则不会创建;如果不存在,创建新的队列。
            true,      //durable持久的,指定队列持久
            false,   //exclusive独占的,是否能被其他队列访问true排他的。如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
                                //排它是基于连接可见的,同一个连接不同信道是可以访问同一连接创建的排它队列,“首次”是指如果一个连接已经声明了一个排他队列,
                                //其他连接是不允许建立同名的排他队列,即使这个队列是持久化的,一旦连接关闭或者客户端退出,该排它队列会被自动删除,这种队列适用于一个客户端同时发送与接口消息的场景。
            false,  //设置是否自动删除。当所有消费者都与这个队列断开连接时,这个队列会自动删除。注意: 不是说该队列没有消费者连接时该队列就会自动删除,因为当生产者声明了该队列且没有消费者连接消费时,该队列是不会自动删除的。
            false,       //nowait
            new AMQPTable(array(
                'x-max-priority'         => $priority, //消息队列的优先级
            ))
        );
        //绑定交换机和队列 参数:队列名,交换机名,路由键名
        $this->channel->queue_bind($this->query_name, $this->exchange_name, $this->route_key_name);
        $this->channel->queue_bind($this->query_name, $this->exchange_name, $this->query_name);
    }

    /**
     * 声明消费失败队列
     */
    private function declareFailedQueue()
    {
        $this->channel->queue_declare($this->query_failed_name, false, true, false, false, false);
        $this->channel->queue_bind($this->query_failed_name, $this->exchange_failed_name, $this->query_name);
    }
}

我们将会实现如下功能

  • 结合RabbitMQ的Topic模式和Work Queue模式实现生产方产生消息,消费方按需订阅,消息投递到消费方的队列之后,多个worker同时对消息进行消费
  • 结合RabbitMQ的 Message TTL 和 Dead Letter Exchange 实现消息的延时重试功能
  • 消息达到最大重试次数之后,将其投递到失败队列,等待人工介入处理bug后,重新将其加入队列消费

具体流程见下图

xxx

  1. 生产者发布消息到主Exchange
  2. 主Exchange根据Routing Key将消息分发到对应的消息队列
  3. 多个消费者的worker进程同时对队列中的消息进行消费,因此它们之间采用“竞争”的方式来争取消息的消费
  4. 消息消费后,不管成功失败,都要返回ACK消费确认消息给队列,避免消息消费确认机制导致重复投递,同时,如果消息处理成功,则结束流程,否则进入重试阶段
  5. 如果重试次数小于设定的最大重试次数(3次),则将消息重新投递到Retry Exchange的重试队列
  6. 重试队列不需要消费者直接订阅,它会等待消息的有效时间过期之后,重新将消息投递给Dead Letter Exchange,我们在这里将其设置为主Exchange,实现延时后重新投递消息,这样消费者就可以重新消费消息
  7. 如果三次以上都是消费失败,则认为消息无法被处理,直接将消息投递给Failed Exchange的Failed Queue,这时候应用可以触发报警机制,以通知相关责任人处理
  8. 等待人工介入处理(解决bug)之后,重新将消息投递到主Exchange,这样就可以重新消费了

外部确认消息表结构

CREATE TABLE `nt_vm_message_idempotent` (
  `message_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '消息ID',
  `message_content` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '消息内容',
  `product_status` tinyint(1) UNSIGNED NOT NULL DEFAULT 0 COMMENT '是否生产成功到mq',
  `consume_status` tinyint(1) NOT NULL COMMENT '是否消费成功',
  `create_time` int(10) UNSIGNED NOT NULL DEFAULT 0,
  `update_time` int(10) UNSIGNED NOT NULL DEFAULT 0,
  `priority` tinyint(1) UNSIGNED NOT NULL DEFAULT 0 COMMENT '优先级属性',
  PRIMARY KEY (`message_id`) USING BTREE,
  UNIQUE INDEX `unique_message_id`(`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

4.消息的消费脚本

<?php

include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Created by PhpStorm.
 * User: tangkeji
 * Date: 21-4-26
 * Time: 下午3:31
 */

class VmMqCommand extends CConsoleCommand {

    private function _Output($data, $isEnd = 0) {
        if (is_array($data) || is_object($data)) {
            var_dump($data);
            echo "\n";
        } else {
            echo $data . "\n";
        }

        if ($isEnd) {
            Yii::app()->end();
        }
    }


    /**
     * 消息进程
     */
    public function actionRun(){
        if(LibCommon::isRunCommand('vmmq run')===true)return true;
        $stopped = false;
        // 自动退出计数器,当值小于1的时候退出
        // 发生异常-20,正常执行每次-2 (是否开启判断关闭)

        $autoExitCounter = 200;

        // 信号处理,接收到SIGUSR2信号的时候自动退出
        // 注意:环境必须是php7.1+才支持
//        if (function_exists('pcntl_async_signals')) {
//            pcntl_async_signals(true);
//        }
//
//        if (function_exists('pcntl_signal')) {
//            pcntl_signal(SIGUSR2, function ($sig) use (&$stopped) {
//                $stopped = true;
//            });
//        }

        $mq = new VmMq();
        $callback = function (AMQPMessage $msg, $publishRetry, $publishFailed) use
        (
            &$autoExitCounter
        ) {
            $retry = $this->getRetryCount($msg);

            try {
                $routingKey = $this->getOrigRoutingKey($msg);
                $subMessage = new SubMessage($msg, $routingKey , [
                    'retry_count' => $retry, // 重试次数
                ]);

                $this->subscribe($subMessage,$retry, $publishRetry, $publishFailed);

                //$autoExitCounter = $autoExitCounter - 2;

            } catch (\Exception $ex) {
                //$autoExitCounter = $autoExitCounter - 20; // 发生普通异常,退出计数器-20(关闭)
                $this->writeLog('runtime/vm_consume_failed.log', '消费失败!' . $ex->getMessage() . $msg->getBody());
                $this->retryFail($retry,$msg,$publishRetry,$publishFailed);
            }
        };
        $mq->consume(
            $callback,
            function () use (&$stopped, &$autoExitCounter) {
                return $stopped || $autoExitCounter < 1;
            }
        );
    }

    /**
     * 消息重试
     */
    public function retryFail($retry,AMQPMessage $msg, $publishRetry, $publishFailed){
        if ($retry >= 3) {
            // 超过最大重试次数,消息无法处理
            $publishFailed($msg);
            return;
        }

        // 消息处理失败,稍后重试
        $publishRetry($msg);
        return;
    }

    /**
     * 获取消息重试次数
     * @param AMQPMessage $msg
     * @return int
     */
    protected function getRetryCount($msg)
    {
        $retry = 0;
        if ($msg->has('application_headers')) {
            $headers = $msg->get('application_headers')->getNativeData();
            if (isset($headers['x-death'][0]['count'])) {
                $retry = $headers['x-death'][0]['count'];
            }
        }

        return (int)$retry;
    }

    /**
     * 订阅消息处理
     * @param \Aicode\RabbitMQ\SubMessage $msg
     * @param $retry
     * @param $publishRetry
     * @param $publishFailed
     * @return bool 处理成功返回true(返回true后将会对消息进行处理确认),失败throw 异常
     */
    public function subscribe($msg,$retry,$publishRetry, $publishFailed)
    {
        // TODO 业务逻辑实现

//        throw new Exception("消费异常!!!");
        echo sprintf(
            "subscriber:<%s> %s %s\n",
            $msg->getRoutingKey(),
            $retry,
            $msg->getMessage()
        );
        echo "----------------------------------------\n";
        //存入到表中,标识该消息已消费
        $msgBody = json_decode($msg->getMessage(),true);
        if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){
            $this->writeLog('runtime/vm_consume_failed.log','获取消费ID为空!' . $msg->getMessage());
            //发送ack信息应答当前消息处理完成
            $msg->message->delivery_info['channel']->basic_ack($msg->message->delivery_info['delivery_tag']);
            return;
        }
        $unique_messageId = $msgBody['unique_messageId'];
        $criteria = new CDbCriteria;
        $criteria->addCondition("message_id = '".$unique_messageId."'");
        $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria);
        //如果找到,则更新数据库消费状态
        if ($messageIdempotent && $messageIdempotent->consume_status == 0) {
            try {
                LibCommon::doMqPull($msgBody);

                $update_time = time();
                $connection = Yii::app()->db;
                $command = $connection->createCommand("
                            UPDATE nt_vm_message_idempotent SET consume_status=1,update_time='$update_time' WHERE message_id = '$unique_messageId'
                        ");
                $re = $command->execute();
                if ($re) {
//                    $this->writeLog('runtime/vm_consume_log.log', $messageIdempotent->message_id . $msg->getMessage());
                    //发送ack信息应答当前消息处理完成
                    $msg->message->delivery_info['channel']->basic_ack($msg->message->delivery_info['delivery_tag']);
                    return;
                } else {
                    $this->writeLog('runtime/vm_consume_failed.log', '数据库保存失败' . $msg->getMessage());
                    $this->retryFail($retry,$msg->message, $publishRetry, $publishFailed);
                }
            }catch (Exception $e) {
                echo $e->getMessage();
                $this->writeLog('runtime/vm_consume_failed.log', '消费失败!' . $e->getMessage() . $msg->getMessage());
                $this->retryFail($retry,$msg->message, $publishRetry, $publishFailed);
            }
        } else {
            //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;
            $this->writeLog('runtime/vm_consume_failed.log','该消息已消费,无须重复消费!' . $msg->getMessage());
            //发送ack信息应答当前消息处理完成
            $msg->message->delivery_info['channel']->basic_ack($msg->message->delivery_info['delivery_tag']);
        }
        return true;

    }

    private function getOrigRoutingKey(AMQPMessage $msg)
    {

        $retry = null;
        if ($msg->has('application_headers')) {
            $headers = $msg->get('application_headers')->getNativeData();
            if (isset($headers['x-orig-routing-key'])) {
                $retry = $headers['x-orig-routing-key'];
            }
        }
        return $retry?$retry:$msg->get('routing_key');
    }

    /**
     * 日志写入
     * @param $file log文件路径
     * @param $dataStr 报错字符串
     */
    protected function writeLog($file,$dataStr)
    {
        file_put_contents(ROOT_PATH.$file, date('Y-m-d H:i:s').'    '.$dataStr .PHP_EOL, FILE_APPEND);
    }
}

三、总结

以上是我的rabbitmq从0到有的经历,可能里面有不完美或者错误请大家指出,必会好好纠正,主要我这个消息要保证消息的可靠性,不容许丢失。里面用到rabbitmq的高级特性如ack确认机制,幂等性,限流机制,重回机制,ttl,死信队列(相当于失败消息的回收站)。

RabbitMQ消息模式(消息100%的投递、幂等性概念) https://blog.csdn.net/weixin_42687829/article/details/104327711

推荐阅读