c# - 如何从远程 IBM MQ 不断获取消息
问题描述
我创建了一个 Windows 服务,它将连接到远程 MQ 并以 MQSTR 格式获取消息,但在收到消息后我没有关闭与远程 MQ 的连接。我的 windows 服务将不断检查远程 MQ 中是否有数据,但是在收到一条消息后,我需要重新启动我的服务以从远程 MQ 获取另一条消息。谁能告诉我我需要做什么才能不断地从远程 MQ 获取消息。任何线索或任何链接都可以。请帮忙
我的 C# windows 服务代码是这样的:
程序.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceProcess;
using System.Text;
using System.Threading.Tasks;
namespace MQ_listner
{
static class Program
{
static void Main()
{
ServiceBase[] ServicesToRun;
ServicesToRun = new ServiceBase[]
{
new Service1()
};
ServiceBase.Run(ServicesToRun);
}
}
}
服务1.cs
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MQ_listner
{
public partial class Service1 : ServiceBase
{
private MQReader MQReader;
private string _serviceName = "MQ_Listener";
private DateTime _TimeStart;
private bool _run = true;
private Thread _thread;
int WaitWhenStop = 0;
private DateTime _TimeEnd;
private TimeSpan _TimeDifference;
private TimeSpan _TimeElasped = new TimeSpan(0);
public Service1()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
try
{
EventLog.WriteEntry(_serviceName + "was started at" + _TimeStart.ToString());
_run = true;
_thread = new Thread(new ThreadStart(StartMQListenerService));
_thread.IsBackground = true;
_thread.Start();
}
catch (Exception ex)
{
EventLog.WriteEntry(_serviceName + "was not started . Error Message : " + ex.ToString());
}
}
protected override void OnStop()
{
_run = false;
_thread.Join(WaitWhenStop);
_TimeEnd = DateTime.Now;
_TimeDifference = _TimeEnd.Subtract(_TimeStart);
_TimeElasped = _TimeElasped.Add(_TimeDifference);
EventLog.WriteEntry(_serviceName + "was stopped at " + _TimeEnd.ToString() + "\r\n ran for total time :" + _TimeElasped.ToString());
}
// MQ connection service
public void StartMQListenerService()
{
try
{
if (_run)
{
if (MQReader == null)
{
MQReader = new MQReader();
MQReader.InitializeConnections();
EventLog.WriteEntry(_serviceName + "MQ connection is established");
}
}
}
catch (Exception ex)
{
System.Diagnostics.EventLog.WriteEntry(_serviceName, ex.ToString());
System.Diagnostics.ProcessStartInfo startinfo = new System.Diagnostics.ProcessStartInfo();
startinfo.WindowStyle = System.Diagnostics.ProcessWindowStyle.Hidden;
startinfo.FileName = "NET";
startinfo.Arguments = "stop" + this.ServiceName;
Process.Start(startinfo);
}
}
}
}
****MQReader.cs****
using System;
using IBM.WMQ;
using System.Diagnostics;
using System.IO;
using System.Xml;
using System.Linq;
using System.Xml.Linq;
using System.Configuration;
namespace MQ_listner
{
internal class MQReader
{
public MQReader()
{
}
public void InitializeConnections()
{
MQQueueManager queueManager;
MQMessage queueMessage;
MQGetMessageOptions queueGetMessageOptions;
MQQueue queue;
string QueueName;
string QueueManagerName;
string ChannelInfo;
string channelName;
string PortNumber;
string transportType;
string connectionName;
QueueManagerName = ConfigurationManager.AppSettings["QueueManager"];
QueueName = ConfigurationManager.AppSettings["Queuename"];
ChannelInfo = ConfigurationManager.AppSettings["ChannelInformation"];
PortNumber = ConfigurationManager.AppSettings["Port"];
char[] separator = { '/' };
string[] ChannelParams;
ChannelParams = ChannelInfo.Split(separator);
channelName = ConfigurationManager.AppSettings["Channel"];
transportType = ConfigurationManager.AppSettings["TransportType"];
connectionName = ConfigurationManager.AppSettings["ConnectionName"];
String strReturn = "";
try
{
queueManager = new MQQueueManager(QueueManagerName,
channelName, connectionName);
strReturn = "Connected Successfully";
queue = queueManager.AccessQueue(QueueName,
MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
queueMessage = new MQMessage();
queueMessage.Format = MQC.MQFMT_STRING;
queueGetMessageOptions = new MQGetMessageOptions();
queue.Get(queueMessage, queueGetMessageOptions);
strReturn = queueMessage.ReadString(queueMessage.MessageLength);
}
catch (MQException exp)
{
strReturn = "Exception: " + exp.Message;
}
string path1 = @"C:\documents\Example.txt";
System.IO.File.WriteAllText(path1, strReturn);
}
}
}
谁能告诉我我的代码有什么问题?我需要在这里添加什么来不断地从远程 MQ 获取消息吗?请帮忙 。任何链接或线索都可以。
编辑
一定时间后,我需要重新启动服务以从远程 mq 获取数据。你能告诉我为什么 windows 服务需要重新启动才能获取数据吗?有什么线索吗?任何想法 ?
解决方案
您的队列关闭和队列管理器断开连接在哪里?如果您连接和/或打开某些东西,您必须确保您关闭并断开它。我强烈建议您参加 MQ 编程课程。或者去参加MQ 技术会议,那里有关于编程 MQ 的会议。
我发布了一个功能齐全的 C# MQ 程序,该程序在MQQueueManager 消息池中检索队列中的所有消息
这是您的 MQReader 类的更新版本,它应该给您正确的想法。注意:我没有测试它。我把它留给你。:)
此外,您应该将连接信息放入 Hashtable 并将 Hashtable 传递给 MQQueueManager 类。
using System;
using IBM.WMQ;
using System.Diagnostics;
using System.IO;
using System.Xml;
using System.Linq;
using System.Xml.Linq;
using System.Configuration;
namespace MQ_listner
{
internal class MQReader
{
private MQQueueManager qManager = null;
private MQMessage inQ = null;
private bool running = true;
public MQReader()
{
}
public bool InitQMgrAndQueue()
{
bool flag = true;
Hashtable qMgrProp = new Hashtable();
qMgrProp.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED);
qMgrProp.Add(MQC.HOST_NAME_PROPERTY, ConfigurationManager.AppSettings["ConnectionName"]);
qMgrProp.Add(MQC.CHANNEL_PROPERTY, ConfigurationManager.AppSettings["Channel"]);
try
{
if (ConfigurationManager.AppSettings["Port"] != null)
qMgrProp.Add(MQC.PORT_PROPERTY, System.Int32.Parse(ConfigurationManager.AppSettings["Port"]));
else
qMgrProp.Add(MQC.PORT_PROPERTY, 1414);
}
catch (System.FormatException e)
{
qMgrProp.Add(MQC.PORT_PROPERTY, 1414);
}
if (ConfigurationManager.AppSettings["UserID"] != null)
qMgrProp.Add(MQC.USER_ID_PROPERTY, ConfigurationManager.AppSettings["UserID"]);
if (ConfigurationManager.AppSettings["Password"] != null)
qMgrProp.Add(MQC.PASSWORD_PROPERTY, ConfigurationManager.AppSettings["Password"]);
try
{
qManager = new MQQueueManager(ConfigurationManager.AppSettings["QueueManager"],
qMgrProp);
System.Console.Out.WriteLine("Connected Successfully");
inQ = qManager.AccessQueue(ConfigurationManager.AppSettings["Queuename"],
MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
System.Console.Out.WriteLine("Open queue Successfully");
}
catch (MQException exp)
{
System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
flag = false;
}
return flag;
}
public void LoopThruMessages()
{
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = 2500; // 2.5 seconds wait time or use MQC.MQEI_UNLIMITED to wait forever
MQMessage msg = null;
while (running)
{
try
{
msg = new MQMessage();
inQ.Get(msg, gmo);
System.Console.Out.WriteLine("Message Data: " + msg.ReadString(msg.MessageLength));
}
catch (MQException mqex)
{
if (mqex.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
{
// no meesage - life is good - loop again
}
else
{
running = false; // severe error - time to exit
System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
}
}
catch (System.IO.IOException ioex)
{
System.Console.Out.WriteLine("ioex=" + ioex);
}
}
try
{
if (inQ != null)
{
inQ.Close();
System.Console.Out.WriteLine("Closed queue");
}
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
}
try
{
if (qMgr != null)
{
qMgr.Disconnect();
System.Console.Out.WriteLine("disconnected from queue manager");
}
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
}
}
public void StopIt()
{
running = false;
}
}
}
每当您停止服务时,请确保它调用 MQReader 中的 StopIt 方法。
推荐阅读
- angular - Angular 5:身份验证保护自动导航到指定组件
- octave - 如何在使用 mmap() 分配的共享内存中实例化 Octave 矩阵?
- xml - Shell:如果属性不存在,则在 XML 元素中添加属性
- sql-server - 将 SQL 中的行分隔为不同的列,用破折号 (-) 和管道 (|) 分隔
- seaborn - FacetGrid Seaborn 需要很长时间才能绘制
- r - R查找超过2个向量的匹配项
- node.js - 部署节点。js 应用程序到 Azure
- common-lisp - SBCL 特定声明
- c# - 将 Double 转换为字符串(完整数字)C#
- nfc - Android Things 和 NFC