首页 > 解决方案 > 如何在 C# 中从 MSMQ 读取数据?(请注意 - 数据不是用 C# 编写的)

问题描述

我在我的项目中使用 nodejs、msmq 和 .net 作为技术堆栈。我正在使用 nodejs 脚本从硬件收集数据并写入 MSMQ。我想在 .net 中阅读这个 MSMQ,但我遇到了错误。此外,我也无法收到该消息。

错误 System.Xml.XmlException:根级别的数据无效。第 1 行,位于 System.Xml.XmlTextReaderImpl.Throw(String res, String arg) 'objMessage.ConnectorType' 的 System.Xml.XmlTextReaderImpl.Throw(Exception e) 的位置引发了“System.InvalidOperationException”“objMessage.Body”类型的异常' 抛出类型为 'System.Xml.XmlException' 的异常

//Nodejs script
const msmq = require('updated-node-msmq');
const queue = msmq.openOrCreateQueue('.\\Private$\\EEG');
queue.send(records);   
// C# code
if (MessageQueue.Exists(@".\Private$\EEG")){
    objMessageQueue = new MessageQueue(@".\Private$\EEG");
}
objMessage = objMessageQueue.Receive();
objMessage.Formatter = new XmlMessageFormatter(new Type[] 
     {typeof(Payload) });
var message = (Payload)objMessage.Body;

标签: c#.netnode.jsmsmq

解决方案


我能够解决这个问题。我没有使用 message.Body,而是使用了 message.BodyStream 并将该数据转换为二进制格式。这是我的解决方案。

   using System;
using System.Messaging;
using System.Data.SqlClient;
using System.Data;
using Newtonsoft.Json;
using System.IO;
using System.Text;

namespace NeuroskyListener
{
    class Program
    {
        public static bool isRunning;
        private static MessageQueue messageQueue;

        public static void Main(string[] args)
        {
            InitializeQueue();
            Console.ReadLine();
        }

        private static void InitializeQueue()
        {
            string queuePath = @".\Private$\eegNew";

            if (!MessageQueue.Exists(queuePath))
            {
                messageQueue = MessageQueue.Create(queuePath);
            }
            else
            {
                messageQueue = new MessageQueue(queuePath);
            }
            isRunning = true;
            messageQueue.Formatter = new XmlMessageFormatter(new Type[] {typeof(string) });
            messageQueue.ReceiveCompleted += OnReceiveCompleted;
            messageQueue.BeginReceive();
        }

        private static void OnReceiveCompleted(object source, ReceiveCompletedEventArgs asyncResult)
        {
            try
            {   
                MessageQueue mq =  (MessageQueue)source;

                if (mq != null)
                {
                    try
                    {
                        System.Messaging.Message message = null;
                        try
                        {
                            message = mq.EndReceive(asyncResult.AsyncResult);
                            BinaryReader reader = new BinaryReader(message.BodyStream);

                            int count = (int)message.BodyStream.Length;

                            byte[] bytes = reader.ReadBytes(count);


                           string bodyString = Encoding.UTF8.GetString(bytes);
                            eegPayload lst = JsonConvert.DeserializeObject<eegPayload>(bodyString);

                        }
                        catch (Exception ex)
                        {
                           // LogMessage(ex.Message);
                        }
                        if (message != null)
                        {
                            //Payload payload = message.Body as Payload;

                            Console.WriteLine(message.Body);

                            //if (payload != null)
                            //{
                            //    receivedCounter++;
                            //    if ((receivedCounter % 10000) == 0)
                            //    {
                            //        string messageText = string.Format("Received {0} messages", receivedCounter);
                            //        LogMessage(messageText);
                            //    }
                            //}
                        }
                    }
                    finally
                    {
                        if (isRunning)
                        {
                            mq.BeginReceive();
                        }
                    }
                }
                return;
            }
            catch (Exception exc)
            {
                //LogMessage(exc.Message);
            }
        }
    }
}

推荐阅读