首页 > 技术文章 > 消息队列MSMQ

become 2018-09-06 14:57 原文

一、MSMQ介绍和安装消息队列

原理:消息的发送者把自己想要发送的信息放入一个容器中(我们称之为message),然后把它保存至一个系统公用空间的消息队列(message queue)中;本地或者是异地的消息接收程序再从该队列中取出发给它的消息进行处理。 

消息可以以两种方式发送,即快递方式(express)和可恢复模式(recoverable),它们的区别在于,快递方式为了消息的快速传递,把消息放置于内存中,而不放于物理磁盘上,以获取较高的处理能力;可恢复模式在传送过程的每一步骤中,都把消息写入物理磁盘中,以得到较好的故障恢复能力。消息队列可以放置在发送方、接收方所在的机器上,也可以单独放置在另外一台机器上。 

好处:由于是异步通信,无论是发送方还是接收方都不用等待对方返回成功消息,就可以执行余下的代码,因而大大地提高了事物处理的能力;当信息传送过程中,信息发送机制具有一定功能的故障恢复能力;msmq的消息传递机制使得消息通信的双方具有不同的物理平台成为可能

具体的安装过程就是在控制面板里“添加/删除程序”下“添加/删除Windows组件”,完成添加就OK。安装完成后就可以通过交互界添加新的消息队列。

二、创建、删除和管理队列     

在.NET环境下编写Message Queue程序的前提就是需要先安装MSMQ,本文之前已经作了详细的介绍。要开发MSMQ程序就必须学习一个很重要的类(MessageQueue),该类位于名称空间System.Messageing下。其中有几个常用的方法必须掌握:  --Create方法:创建使用指定路径的新消息队列。
  --Delete方法:删除现有的消息队列。
  --Existe方法:查看指定消息队列是否存在。
  --GetAllMessages()方法:得到队列中的所有消息。
  --GetPublicQueues方法:在“消息队列”网络中定位消息队列。
  --Peek/BeginPeek方法:查看某个特定队列中的消息队列,但不从该队列中移出消息。
  --Receive/BeginReceive方法:检索指定消息队列中最前面的消息并将其从该队列中移除。
  --Send方法:发送消息到指定的消息队列。
  --Purge方法:清空指定队列的消息。

 

三、发送和序列化消息    

 MSMQ消息队列中定义的消息由一个主体(body)和若干属性构成。消息的主体可以由文本、二进制构成,根据需要还可以被加密。在MSMQ 中消息的大小不能够超过4MB。发送消息是通过Send方法来完成的,需要一个Message参数。
1、发送消息:
     步骤:连接队列-->指定消息格式-->提供要发送的数据(主体)-->调用Send()方法将消息发送出去。详细见后面的示例程序。
     
2、序列化消息:
     消息序列化可以通过.NET Framework附带的三个预定义格式化程序来完成:
    --  XMLMessageFormatter对象----MessageQueue组件的默认格式化程序设置。
    --  BinaryMessageFormatter对象;
    --  ActiveXMessageFormatter对象; 
    由于后两者格式化后的消息通常不能为人阅读,所以我们经常用到的是XMLMessageFormatter对象。该对象构造方法有三种重载:

1public XmlMessageFormatter();
2public XmlMessageFormatter(string[] targetTypeNames);
3public XmlMessageFormatter(Type[] targetTypes);

如我们后面的示例程序中用到的序列化语句:

XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

四、读取和接收消息

1、读取消息:
    也就是从指定队列中获取消息,详细请查看本文前面的关于消息操作的方法介绍。
2、接收消息有两种方式:
    --> 通过Receive方法--具体功能请返回本文前面有详细介绍。
    --> 通过Peek方法--具体功能请返回本文前面有详细介绍。

 

五、消息使用实例

        /// <summary>

        /// 1.通过Create方法创建使用指定路径的新消息队列

        /// </summary>

        /// <param name="queuePath"></param>

        public void Createqueue(string queuePath)
        {

            try
            {

                if (!MessageQueue.Exists(queuePath))
                {

                    MessageQueue.Create(queuePath);

                }

                else
                {

                    Console.WriteLine(queuePath + "已经存在!");

                    //MessageQueue.Delete(queuePath);

                    //MessageQueue.Create(queuePath);

                    //Console.WriteLine(queuePath + "删除重建");

                }

                Path = queuePath;

            }

            catch (MessageQueueException e)
            {

                Console.WriteLine(e.Message);

            }

        }



        /// <summary>

        ///  2.连接消息队列并发送消息到队列

        /// 远程模式:MessageQueue rmQ = new MessageQueue("FormatName:Direct=OS:machinename//private$//queue");

        ///     rmQ.Send("sent to regular queue - Atul");对于外网的MSMQ只能发不能收

        /// </summary>

        public void SendMessage(string sendmsg)
        {

            try
            {

                //连接到本地队列

                MessageQueue myQueue = new MessageQueue(Path);

                //MessageQueue myQueue = new MessageQueue("FormatName:Direct=TCP:192.168.12.79//Private$//myQueue1");

                //MessageQueue rmQ = new MessageQueue("FormatName:Direct=TCP:121.0.0.1//private$//queue");--远程格式

                Message myMessage = new Message();

                myMessage.Body = sendmsg;

                myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

                //发生消息到队列中

                myQueue.Send(myMessage);

                Console.WriteLine("消息发送成功!");

                //Console.ReadLine();

            }

            catch (ArgumentException e)
            {

                //Console.WriteLine(e.Message);

            }

        }



        /// <summary>

        /// 3.连接消息队列并从队列中接收消息

        /// </summary>

        public void ReceiveMessage()
        {

            MessageQueue myQueue = new MessageQueue(Path);

            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

            try
            {

                //从队列中接收消息

                Message myMessage = myQueue.Receive();// myQueue.Peek();--接收后不消息从队列中移除

                string context = myMessage.Body.ToString();

                Console.WriteLine("消息内容:" + context);

                //Console.ReadLine();

            }

            catch (MessageQueueException e)
            {

                Console.WriteLine(e.Message);

            }

            catch (InvalidCastException e)
            {

                Console.WriteLine(e.Message);

            }

        }



        /// <summary>

        /// 4.清空指定队列的消息

        /// </summary>

        public void ClealMessage()
        {

            MessageQueue myQueue = new MessageQueue(Path);

            myQueue.Purge();

            Console.WriteLine("已清空对了{0}上的所有消息", Path);

        }



        /// <summary>

        /// 5.连接队列并获取队列的全部消息

        /// </summary>

        public void GetAllMessage()
        {

            MessageQueue myQueue = new MessageQueue(Path);

            Message[] allMessage = myQueue.GetAllMessages();

            XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

            for (int i = 0; i < allMessage.Length; i++)
            {

                allMessage[i].Formatter = formatter;

                Console.WriteLine("第{0}机密消息为:{1}", i + 1, allMessage[i].Body.ToString());

            }

            //Console.ReadLine();

        }

 

 

推荐阅读