一、前言
在上一篇rabbitmq开发体验,我们大致介绍和安装上了rabbitmq和php扩展和界面操作rabbitmq的方法,下面正是正式的用我们php来操作消息队列的生产和消费。附上参考的网站:
- rabbitmq官网 https://www.rabbitmq.com/ 全英文看起来吃力 官方入门文档:https://www.rabbitmq.com/getstarted.html
- 官方入门文档 https://www.cnblogs.com/grimm/p/5728736.html 此博客主对官方php案例的解释
- RabbitMQ发布订阅实战-实现延时重试队列 代码项目:https://github.com/mylxsw/rabbitmq-pubsub-php
- rabbitmq高级特性 B站视频 https://www.bilibili.com/video/BV1S5411H7ef?from=search&seid=8055368004001009131
- RabbitMQ的ack或nack机制使用不当导致的队列堵塞或死循环问题 https://blog.csdn.net/lisheng19870305/article/details/112849495
- RabbitMQ 延时消息队列
- Rabbitmq之高级特性——百分百投递消息&消息确认模式&消息返回模式实现
- RabbitMQ:消费者优先级的介绍和使用 https://blog.csdn.net/weixin_45492007/article/details/106189961
- RabbitMQ消息模式(消息100%的投递、幂等性概念) https://blog.csdn.net/weixin_42687829/article/details/104327711
- 简介Rabbitmq的几种消费模式 https://www.jianshu.com/p/7ac733b7481b
二、开发经历
对于rabbitmq的php类库,我开发是使用PHP amqplib,composer解决依赖管理。
添加composer.json:
{ "require": { "php-amqplib/php-amqplib": ">=2.6.1" } } composer install # 或者 直接运行包引入 composer require php-amqplib/php-amqplib
我的开发框架是yii1.1,核心代码如下,有错误请指正。
1.rabbitmq的连接底层类
<?php include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * rabbitmq工具类 */ class RabbitMq { protected $connection; protected $channel; protected $exchange_name; protected $query_name; protected $route_key_name; /** * 构造器 */ public function __construct() { //读取文件会导致并发性高时连接失败故写在配置文件 $config = $GLOBALS['rabbitmq_config']; if (!$config) throw new \AMQPConnectionException('config error!'); $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password'], $config['vhost']); if (!$this->connection) { throw \AMQPConnectionException("Cannot connect to the broker!\n"); } $this->channel = $this->connection->channel(); } /** * 日志写入 * @param $file log文件路径 * @param $dataStr 报错字符串 */ protected function writeLog($file,$dataStr) { file_put_contents(ROOT_PATH.$file, date('Y-m-d H:i:s').' '.$dataStr .PHP_EOL, FILE_APPEND); } /** * close link */ public function close() { $this->channel->close(); $this->connection->close(); } /** * RabbitMQ destructor */ public function __destruct() { $this->close(); } }
2.消息的封装类
<?php include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class SubMessage { public $message; private $routingKey; private $params; /** * SubMessage constructor. * * @param AMQPMessage $message * @param string $routingKey * @param array $params */ public function __construct(AMQPMessage $message, $routingKey, $params = []) { $this->params = $params; //额外的参数这里主要存储重试的次数 $this->message = $message; $this->routingKey = $routingKey; } /** * Get AMQP Message * * @return AMQPMessage */ public function getAMQPMessage() { return $this->message; } /** * Get original Message * * @return Message */ public function getMessage() { return $this->message->body; } /** * Get meta params * * @return array */ public function getParams() { return is_array($this->params) ? $this->params : []; } /** * Get meta param * * @param string $key * * @return mixed|null */ public function getParam(string $key) { return isset($this->params[$key]) ? $this->params[$key] : null; } /** * Get routing key * * @return string */ public function getRoutingKey() { return $this->routingKey; } }
3.消息的核心类
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; /** * vm独立站推送 * 简介Rabbitmq的几种消费模式 https://www.jianshu.com/p/7ac733b7481b */ class VmMq extends RabbitMq { protected $exchange_name = 'master'; //主Exchange,发布消息时发布到该Exchange protected $exchange_retry_name = 'master.retry'; //重试Exchange,消息处理失败时(3次以内),将消息重新投递给该Exchange protected $exchange_failed_name = 'master.failed'; //失败Exchange,超过三次重试失败后,消息投递到该Exchange protected $query_name = 'query_vm'; //消费服务需要declare三个队列[queue_name] 队列名称,格式符合 [服务名称]@订阅服务标识 protected $query_retry_name = 'query_vm@retry'; protected $query_failed_name = 'query_vm@fail'; protected $route_key_name = 'route_key_vm'; //路由键名 /** * 构造器 */ public function __construct() { parent::__construct(); //第2个参数:rabbitmq将给一个消费者一次只发布十条消息。或者说,只有收到消费者上十个消息的完成应答,才给它发布新的消息。如果消费者很忙,消费过慢,队列可能会被填满,这时你需要增加消费者,或者使用其他策略。 $this->channel->basic_qos(null, 100, false); /* * 第1个参数:交换机名 * 第2个参数:声明topic类型交换器 * 第3个参数:passive消极的,如果该交换机已存在,则不会创建;如果不存在,创建新的交换机。 * 第4个参数:durable持久的,指定交换机持久 * 第5个参数:auto_delete,通道关闭后是否删除交换机,自删除的前提是以前有队列连接这个交换器,后来所有与这个交换器绑定的队列或者交换器都与此解绑, * Topic交换器非常强大,可以像其他类型的交换器一样工作: * 当一个队列的绑定键是"#"是,它将会接收所有的消息,而不再考虑所接收消息的路由键,就像是fanout发布与订阅交换器一样; * 当一个队列的绑定键没有用到”#“和”*“时,它又像direct交换一样工作。 * routing-key是模糊匹配,*可以只替换一个单词,#可以代替零个或多个单词。 * https://www.cnblogs.com/wuhenzhidu/p/10802749.html */ $this->channel->exchange_declare($this->exchange_name, 'topic', false, true, false); $this->channel->exchange_declare($this->exchange_retry_name, 'topic', false, true, false); $this->channel->exchange_declare($this->exchange_failed_name, 'topic', false, true, false); } /** * 生产消息 */ public function product($data,$priority = 10){ $unique_messageId = $this->create_guid(); //生成消息的唯一标识,用来幂等性 if(!is_array($data)){ $data = array('msg' => $data); } $uid=isset($data['uid'])?$data['uid']:0; $langid = isset($data['langid'])?$data['langid']:1; $data['unique_messageId'] = $unique_messageId; $data = json_encode($data,JSON_UNESCAPED_UNICODE); //存入到表中,保证生产的消息100%到mq队列 $newModel = DynamicAR::model('nt_vm_message_idempotent'); $newModel->message_id = $unique_messageId; $newModel->uid = $uid; $newModel->message_content = $data; $newModel->product_status = 0; $newModel->consume_status = 0; $newModel->create_time = $newModel->update_time = time(); $newModel->langid = $langid; $newModel->priority = $priority; $newModel->isNewRecord = true; if(!$newModel->save()){ $this->writeLog('runtime/vm_product_failed.log','数据库保存失败' . json_encode($newModel->getErrors()).$data); return; } //推送成功的ack回调 $this->channel->set_ack_handler( function(AMQPMessage $msg){ $msgBody = json_decode($msg->getBody(),true); if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){ $this->writeLog('runtime/vm_product_failed.log','获取消费ID为空!' . $msg->getBody()); return; } $unique_messageId = $msgBody['unique_messageId']; $criteria = new CDbCriteria; $criteria->addCondition("message_id = '".$unique_messageId."'"); $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria); if (!$messageIdempotent) { $this->writeLog('runtime/vm_product_failed.log','该消息数据库里不存在' . $msg->getBody()); return; }else{ $connection = Yii::app()->db; $command = $connection->createCommand(" UPDATE nt_vm_message_idempotent SET product_status=1 WHERE message_id = '$unique_messageId' "); $re = $command->execute(); if($re) { $this->writeLog('runtime/vm_product_log.log',$messageIdempotent->message_id . $msg->getBody()); return; }else{ $this->writeLog('runtime/vm_product_failed.log','数据库保存失败' . $msg->getBody()); return; } } } ); //推送失败的nack回调 $this->channel->set_nack_handler( function(AMQPMessage $message){ $this->writeLog('runtime/vm_product_failed.log',"消息生产到mq nack ".$message->body); } ); //监听交换机或者路由键是否存在 $returnListener = function ( $replyCode, $replyText, $exchange, $routingKey, $message ) { $this->writeLog('runtime/vm.log','replyCode ='.$replyCode.';replyText='.$replyText.';exchange='.$exchange.';routingKey='.$routingKey.';body='.$message->body); }; //开启发送消息的return机制 $this->channel->set_return_listener($returnListener); //开启发送消息的ack回调 $this->channel->confirm_select(); $msg = new AMQPMessage($data,array( 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, //设置消息持久化 'priority' => $priority, //消息的优先级 优先级越大越优先 )); $msg->set('application_headers', new AMQPTable([])); //推送消息到某个交换机,第三个是路由键为方便即是队列名 $this->channel->basic_publish($msg, $this->exchange_name,$this->query_name,true); //等待发送消息的ack回调消息 $this->channel->wait_for_pending_acks(); $this->close(); } protected function create_guid($namespace = '') { static $guid = ''; $uid = uniqid("", true); $data = $namespace; $data .= $_SERVER['REQUEST_TIME']; $data .= isset($_SERVER['HTTP_USER_AGENT'])?$_SERVER['HTTP_USER_AGENT']:''; $data .= isset($_SERVER['SERVER_ADDR'])?$_SERVER['SERVER_ADDR']:''; $data .= isset($_SERVER['SERVER_PORT'])?$_SERVER['SERVER_PORT']:''; $data .= isset($_SERVER['REMOTE_ADDR'])?$_SERVER['REMOTE_ADDR']:''; $data .= isset($_SERVER['REMOTE_PORT'])?$_SERVER['REMOTE_PORT']:''; $hash = strtoupper(hash('ripemd128', $uid . $guid . md5($data))); $guid = '{' . substr($hash, 0, 8) . '-' . substr($hash, 8, 4) . '-' . substr($hash, 12, 4) . '-' . substr($hash, 16, 4) . '-' . substr($hash, 20, 12) . '}'; return $guid; } /** * 消费消息 */ public function consume(\Closure $callback,\Closure $shouldExitCallback = null,$priority = 5){ $this->declareRetryQueue(); $this->declareConsumeQueue($priority); $this->declareFailedQueue(); //执行上面的步骤主要是为保证这些目标交换机和队列已经存在 $queueName = $this->query_name; $exchangeRetryName = $this->exchange_retry_name; $exchangeFailedName = $this->exchange_failed_name; // 发起延时重试的回调 $publishRetry = function ($msg) use ($queueName,$exchangeRetryName) { /** @var AMQPTable $headers */ if ($msg->has('application_headers')) { $headers = $msg->get('application_headers'); } else { $headers = new AMQPTable(); } $headers->set('x-orig-routing-key', $this->getOrigRoutingKey($msg)); $properties = $msg->get_properties(); $properties['application_headers'] = $headers; $newMsg = new AMQPMessage($msg->getBody(), $properties); $this->channel->basic_publish( $newMsg, $exchangeRetryName, $queueName ); // 发送ack信息应答当前消息处理完成 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; // 将消息发送到失败队列的回调 $publishFailed = function ($msg) use ($queueName,$exchangeFailedName) { $this->channel->basic_publish( $msg, $exchangeFailedName, $queueName ); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $this->channel->basic_consume( $this->query_name, '', //customer_tag 消费者标签,用来区分多个消费者 false, //no_local 若设置为true,表示不能将同一个Conenction中生产者发送的消息传递给这个Connection中的消费者 false, //no_ack 是否自动确认消息,true自动确认,false不自动要消费脚本手动调用ack,避免消费异常系统反而自动ack完成 false, //exclusive 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接 false, //nowait // 消费主回调函数 function(AMQPMessage $msg) use ($callback, $publishRetry, $publishFailed) { $retry = $this->getRetryCount($msg); try{ /* * 需要注意的是:在消费消息之前,先获取消息ID,然后根据ID去数据库中查询是否存在主键为消息ID的记录,如果存在的话, * 说明这条消息之前应该是已经被消费过了,那么就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成发送确认消息, * 并且将消息记录进行入库。 */ $msgBody = json_decode($msg->getBody(),true); if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){ $this->writeLog('runtime/vm_consume_failed.log','获取消费ID为空!' . $msg->getBody()); //发送ack信息应答当前消息处理完成 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); return; } $unique_messageId = $msgBody['unique_messageId']; $criteria = new CDbCriteria; $criteria->addCondition("message_id = '".$unique_messageId."'"); $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria); if ($messageIdempotent) { if($messageIdempotent->consume_status == 0){ //如果找不到,则进行消费此消息 $callback($msg, $publishRetry, $publishFailed); }else{ //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费; $this->writeLog('runtime/vm_consume_failed.log','该消息已消费,无须重复消费!' . $msg->getBody()); //发送ack信息应答当前消息处理完成 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); return; } } else { //插入太快 $this->writeLog('runtime/vm_consume_failed.log','插入太快'. $msg->getBody()); $this->retryFail($retry,$msg, $publishRetry, $publishFailed); } }catch (Exception $e){ $this->writeLog('runtime/vm_exception.log',$e->getMessage()); $this->retryFail($retry,$msg, $publishRetry, $publishFailed); } } ); //监听通道消息 快递员看有没有信,有就立马寄 while (count($this->channel->callbacks)) { if ($shouldExitCallback()) { return; } try { $this->channel->wait(); } catch (AMQPTimeoutException $e) { } catch (AMQPIOWaitException $e) { } } $this->close(); } /** * 重试失败的消息 * 注意: 该方法会堵塞执行 * @param \Closure $callback 回调函数,可以为空,返回true则重新发布,false则丢弃 */ public function retryFailed($callback = null) { $this->declareConsumeQueue(); $this->declareFailedQueue(); $queueName = $this->query_name; $exchangeName = $this->exchange_name; $this->channel->basic_consume( $this->query_failed_name, '', //customer_tag false, //no_local false, //no_ack true, //exclusive false, //nowait function ($msg) use ($queueName, $exchangeName, $callback) { if (is_null($callback) || $callback($msg)) { // 重置header中的x-orig-routing-key属性 $msg->set('application_headers', new AMQPTable([ 'x-orig-routing-key' => $this->getOrigRoutingKey($msg), ])); $this->channel->basic_publish( $msg, $exchangeName, $queueName ); } $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } ); while (count($this->channel->callbacks)) { try { $this->channel->wait(); } catch (AMQPTimeoutException $e) { return; } catch (AMQPIOWaitException $e) { } } } /** * 获取绑定queue与exchange时的routingkey */ protected function getOrigRoutingKey($msg){ $retry = null; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-orig-routing-key'])) { $retry = $headers['x-orig-routing-key']; } } return $retry?$retry:$msg->get('routing_key'); } /** * 获取消息重试次数 * @param AMQPMessage $msg * @return int */ protected function getRetryCount($msg) { $retry = 0; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-death'][0]['count'])) { $retry = $headers['x-death'][0]['count']; } } return (int)$retry; } /** * 消息重试 */ public function retryFail($retry,AMQPMessage $msg, $publishRetry, $publishFailed){ if ($retry >= 3) { // 超过最大重试次数,消息无法处理 $publishFailed($msg); return; } // 消息处理失败,稍后重试 $publishRetry($msg); return; } /** * 声明重试队列 */ private function declareRetryQueue() { $this->channel->queue_declare($this->query_retry_name, false, true, false, false, false,new AMQPTable(array( 'x-dead-letter-exchange' => $this->exchange_name, 'x-dead-letter-routing-key' => $this->query_name, 'x-message-ttl' => 3 * 1000, ))); $this->channel->queue_bind($this->query_retry_name, $this->exchange_retry_name, $this->query_name); } /** * 声明消费队列 * @param $priority 消息队列优先级 暂时取个中间值 1-10 */ private function declareConsumeQueue($priority = 5) { //声明队列 $this->channel->queue_declare( $this->query_name, //队列名称 false, //passive消极的,如果该队列已存在,则不会创建;如果不存在,创建新的队列。 true, //durable持久的,指定队列持久 false, //exclusive独占的,是否能被其他队列访问true排他的。如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。 //排它是基于连接可见的,同一个连接不同信道是可以访问同一连接创建的排它队列,“首次”是指如果一个连接已经声明了一个排他队列, //其他连接是不允许建立同名的排他队列,即使这个队列是持久化的,一旦连接关闭或者客户端退出,该排它队列会被自动删除,这种队列适用于一个客户端同时发送与接口消息的场景。 false, //设置是否自动删除。当所有消费者都与这个队列断开连接时,这个队列会自动删除。注意: 不是说该队列没有消费者连接时该队列就会自动删除,因为当生产者声明了该队列且没有消费者连接消费时,该队列是不会自动删除的。 false, //nowait new AMQPTable(array( 'x-max-priority' => $priority, //消息队列的优先级 )) ); //绑定交换机和队列 参数:队列名,交换机名,路由键名 $this->channel->queue_bind($this->query_name, $this->exchange_name, $this->route_key_name); $this->channel->queue_bind($this->query_name, $this->exchange_name, $this->query_name); } /** * 声明消费失败队列 */ private function declareFailedQueue() { $this->channel->queue_declare($this->query_failed_name, false, true, false, false, false); $this->channel->queue_bind($this->query_failed_name, $this->exchange_failed_name, $this->query_name); } }
我们将会实现如下功能
- 结合RabbitMQ的Topic模式和Work Queue模式实现生产方产生消息,消费方按需订阅,消息投递到消费方的队列之后,多个worker同时对消息进行消费
- 结合RabbitMQ的 Message TTL 和 Dead Letter Exchange 实现消息的延时重试功能
- 消息达到最大重试次数之后,将其投递到失败队列,等待人工介入处理bug后,重新将其加入队列消费
具体流程见下图
- 生产者发布消息到主Exchange
- 主Exchange根据Routing Key将消息分发到对应的消息队列
- 多个消费者的worker进程同时对队列中的消息进行消费,因此它们之间采用“竞争”的方式来争取消息的消费
- 消息消费后,不管成功失败,都要返回ACK消费确认消息给队列,避免消息消费确认机制导致重复投递,同时,如果消息处理成功,则结束流程,否则进入重试阶段
- 如果重试次数小于设定的最大重试次数(3次),则将消息重新投递到Retry Exchange的重试队列
- 重试队列不需要消费者直接订阅,它会等待消息的有效时间过期之后,重新将消息投递给Dead Letter Exchange,我们在这里将其设置为主Exchange,实现延时后重新投递消息,这样消费者就可以重新消费消息
- 如果三次以上都是消费失败,则认为消息无法被处理,直接将消息投递给Failed Exchange的Failed Queue,这时候应用可以触发报警机制,以通知相关责任人处理
- 等待人工介入处理(解决bug)之后,重新将消息投递到主Exchange,这样就可以重新消费了
外部确认消息表结构
CREATE TABLE `nt_vm_message_idempotent` (
`message_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '消息ID',
`message_content` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '消息内容',
`product_status` tinyint(1) UNSIGNED NOT NULL DEFAULT 0 COMMENT '是否生产成功到mq',
`consume_status` tinyint(1) NOT NULL COMMENT '是否消费成功',
`create_time` int(10) UNSIGNED NOT NULL DEFAULT 0,
`update_time` int(10) UNSIGNED NOT NULL DEFAULT 0,
`priority` tinyint(1) UNSIGNED NOT NULL DEFAULT 0 COMMENT '优先级属性',
PRIMARY KEY (`message_id`) USING BTREE,
UNIQUE INDEX `unique_message_id`(`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
4.消息的消费脚本
<?php include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * Created by PhpStorm. * User: tangkeji * Date: 21-4-26 * Time: 下午3:31 */ class VmMqCommand extends CConsoleCommand { private function _Output($data, $isEnd = 0) { if (is_array($data) || is_object($data)) { var_dump($data); echo "\n"; } else { echo $data . "\n"; } if ($isEnd) { Yii::app()->end(); } } /** * 消息进程 */ public function actionRun(){ if(LibCommon::isRunCommand('vmmq run')===true)return true; $stopped = false; // 自动退出计数器,当值小于1的时候退出 // 发生异常-20,正常执行每次-2 (是否开启判断关闭) $autoExitCounter = 200; // 信号处理,接收到SIGUSR2信号的时候自动退出 // 注意:环境必须是php7.1+才支持 // if (function_exists('pcntl_async_signals')) { // pcntl_async_signals(true); // } // // if (function_exists('pcntl_signal')) { // pcntl_signal(SIGUSR2, function ($sig) use (&$stopped) { // $stopped = true; // }); // } $mq = new VmMq(); $callback = function (AMQPMessage $msg, $publishRetry, $publishFailed) use ( &$autoExitCounter ) { $retry = $this->getRetryCount($msg); try { $routingKey = $this->getOrigRoutingKey($msg); $subMessage = new SubMessage($msg, $routingKey , [ 'retry_count' => $retry, // 重试次数 ]); $this->subscribe($subMessage,$retry, $publishRetry, $publishFailed); //$autoExitCounter = $autoExitCounter - 2; } catch (\Exception $ex) { //$autoExitCounter = $autoExitCounter - 20; // 发生普通异常,退出计数器-20(关闭) $this->writeLog('runtime/vm_consume_failed.log', '消费失败!' . $ex->getMessage() . $msg->getBody()); $this->retryFail($retry,$msg,$publishRetry,$publishFailed); } }; $mq->consume( $callback, function () use (&$stopped, &$autoExitCounter) { return $stopped || $autoExitCounter < 1; } ); } /** * 消息重试 */ public function retryFail($retry,AMQPMessage $msg, $publishRetry, $publishFailed){ if ($retry >= 3) { // 超过最大重试次数,消息无法处理 $publishFailed($msg); return; } // 消息处理失败,稍后重试 $publishRetry($msg); return; } /** * 获取消息重试次数 * @param AMQPMessage $msg * @return int */ protected function getRetryCount($msg) { $retry = 0; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-death'][0]['count'])) { $retry = $headers['x-death'][0]['count']; } } return (int)$retry; } /** * 订阅消息处理 * @param \Aicode\RabbitMQ\SubMessage $msg * @param $retry * @param $publishRetry * @param $publishFailed * @return bool 处理成功返回true(返回true后将会对消息进行处理确认),失败throw 异常 */ public function subscribe($msg,$retry,$publishRetry, $publishFailed) { // TODO 业务逻辑实现 // throw new Exception("消费异常!!!"); echo sprintf( "subscriber:<%s> %s %s\n", $msg->getRoutingKey(), $retry, $msg->getMessage() ); echo "----------------------------------------\n"; //存入到表中,标识该消息已消费 $msgBody = json_decode($msg->getMessage(),true); if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){ $this->writeLog('runtime/vm_consume_failed.log','获取消费ID为空!' . $msg->getMessage()); //发送ack信息应答当前消息处理完成 $msg->message->delivery_info['channel']->basic_ack($msg->message->delivery_info['delivery_tag']); return; } $unique_messageId = $msgBody['unique_messageId']; $criteria = new CDbCriteria; $criteria->addCondition("message_id = '".$unique_messageId."'"); $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria); //如果找到,则更新数据库消费状态 if ($messageIdempotent && $messageIdempotent->consume_status == 0) { try { LibCommon::doMqPull($msgBody); $update_time = time(); $connection = Yii::app()->db; $command = $connection->createCommand(" UPDATE nt_vm_message_idempotent SET consume_status=1,update_time='$update_time' WHERE message_id = '$unique_messageId' "); $re = $command->execute(); if ($re) { // $this->writeLog('runtime/vm_consume_log.log', $messageIdempotent->message_id . $msg->getMessage()); //发送ack信息应答当前消息处理完成 $msg->message->delivery_info['channel']->basic_ack($msg->message->delivery_info['delivery_tag']); return; } else { $this->writeLog('runtime/vm_consume_failed.log', '数据库保存失败' . $msg->getMessage()); $this->retryFail($retry,$msg->message, $publishRetry, $publishFailed); } }catch (Exception $e) { echo $e->getMessage(); $this->writeLog('runtime/vm_consume_failed.log', '消费失败!' . $e->getMessage() . $msg->getMessage()); $this->retryFail($retry,$msg->message, $publishRetry, $publishFailed); } } else { //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费; $this->writeLog('runtime/vm_consume_failed.log','该消息已消费,无须重复消费!' . $msg->getMessage()); //发送ack信息应答当前消息处理完成 $msg->message->delivery_info['channel']->basic_ack($msg->message->delivery_info['delivery_tag']); } return true; } private function getOrigRoutingKey(AMQPMessage $msg) { $retry = null; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-orig-routing-key'])) { $retry = $headers['x-orig-routing-key']; } } return $retry?$retry:$msg->get('routing_key'); } /** * 日志写入 * @param $file log文件路径 * @param $dataStr 报错字符串 */ protected function writeLog($file,$dataStr) { file_put_contents(ROOT_PATH.$file, date('Y-m-d H:i:s').' '.$dataStr .PHP_EOL, FILE_APPEND); } }
三、总结
以上是我的rabbitmq从0到有的经历,可能里面有不完美或者错误请大家指出,必会好好纠正,主要我这个消息要保证消息的可靠性,不容许丢失。里面用到rabbitmq的高级特性如ack确认机制,幂等性,限流机制,重回机制,ttl,死信队列(相当于失败消息的回收站)。
RabbitMQ消息模式(消息100%的投递、幂等性概念) https://blog.csdn.net/weixin_42687829/article/details/104327711