c# - 如何在 C# 中从 MSMQ 读取数据?(请注意 - 数据不是用 C# 编写的)
问题描述
我在我的项目中使用 nodejs、msmq 和 .net 作为技术堆栈。我正在使用 nodejs 脚本从硬件收集数据并写入 MSMQ。我想在 .net 中阅读这个 MSMQ,但我遇到了错误。此外,我也无法收到该消息。
错误 System.Xml.XmlException:根级别的数据无效。第 1 行,位于 System.Xml.XmlTextReaderImpl.Throw(String res, String arg) 'objMessage.ConnectorType' 的 System.Xml.XmlTextReaderImpl.Throw(Exception e) 的位置引发了“System.InvalidOperationException”“objMessage.Body”类型的异常' 抛出类型为 'System.Xml.XmlException' 的异常
//Nodejs script
const msmq = require('updated-node-msmq');
const queue = msmq.openOrCreateQueue('.\\Private$\\EEG');
queue.send(records);
// C# code
if (MessageQueue.Exists(@".\Private$\EEG")){
objMessageQueue = new MessageQueue(@".\Private$\EEG");
}
objMessage = objMessageQueue.Receive();
objMessage.Formatter = new XmlMessageFormatter(new Type[]
{typeof(Payload) });
var message = (Payload)objMessage.Body;
解决方案
我能够解决这个问题。我没有使用 message.Body,而是使用了 message.BodyStream 并将该数据转换为二进制格式。这是我的解决方案。
using System;
using System.Messaging;
using System.Data.SqlClient;
using System.Data;
using Newtonsoft.Json;
using System.IO;
using System.Text;
namespace NeuroskyListener
{
class Program
{
public static bool isRunning;
private static MessageQueue messageQueue;
public static void Main(string[] args)
{
InitializeQueue();
Console.ReadLine();
}
private static void InitializeQueue()
{
string queuePath = @".\Private$\eegNew";
if (!MessageQueue.Exists(queuePath))
{
messageQueue = MessageQueue.Create(queuePath);
}
else
{
messageQueue = new MessageQueue(queuePath);
}
isRunning = true;
messageQueue.Formatter = new XmlMessageFormatter(new Type[] {typeof(string) });
messageQueue.ReceiveCompleted += OnReceiveCompleted;
messageQueue.BeginReceive();
}
private static void OnReceiveCompleted(object source, ReceiveCompletedEventArgs asyncResult)
{
try
{
MessageQueue mq = (MessageQueue)source;
if (mq != null)
{
try
{
System.Messaging.Message message = null;
try
{
message = mq.EndReceive(asyncResult.AsyncResult);
BinaryReader reader = new BinaryReader(message.BodyStream);
int count = (int)message.BodyStream.Length;
byte[] bytes = reader.ReadBytes(count);
string bodyString = Encoding.UTF8.GetString(bytes);
eegPayload lst = JsonConvert.DeserializeObject<eegPayload>(bodyString);
}
catch (Exception ex)
{
// LogMessage(ex.Message);
}
if (message != null)
{
//Payload payload = message.Body as Payload;
Console.WriteLine(message.Body);
//if (payload != null)
//{
// receivedCounter++;
// if ((receivedCounter % 10000) == 0)
// {
// string messageText = string.Format("Received {0} messages", receivedCounter);
// LogMessage(messageText);
// }
//}
}
}
finally
{
if (isRunning)
{
mq.BeginReceive();
}
}
}
return;
}
catch (Exception exc)
{
//LogMessage(exc.Message);
}
}
}
}
推荐阅读
- javascript - 接收数据后异步/等待.map 错误?
- airtable - 通过 API 请求列出所有 Airtable 表
- installation - 从提取的安装文件创建 MSI
- vba - 动态复制和特殊粘贴
- jhipster - JHipster - How to generate Entity with a field as List of String?
- apache - 如何将 jsr 223 采样器中的采样器统计信息写入 jmeter 侦听器
- ios - 更改 TitleTextAttributes 文本颜色取决于导航栏颜色
- elastic-stack - filebeat 收集器的最大 close_inactive 时间
- hibernate - 带有连接的休眠条件查询 - 避免从第二个表中进行完全选择
- python - Airflow BashOperator 针对特定路径