c# - 每个集群节点上的 IBM MQ 和轮询
问题描述
去年我开发了一个队列监视器,它使用 System.Reactive.Linq 来检查 IBM MQ 总线上是否有消息
代码如下
public class QueueMonitor : IObservable<Message>, IDisposable
{
private string queueName;
private readonly MQQueue mqQueue;
private readonly MQQueueManager queueManager;
private readonly IDisposable timer;
private readonly object lockObj = new object();
private bool isChecking;
private readonly List<IObserver<Message>> observers;
public QueueMonitor(MQQueueManager queueManager, string queueName)
{
this.queueName = queueName;
this.queueManager = queueManager;
observers = new List<IObserver<Message>>();
mqQueue = queueManager.AccessQueue(queueName,
MQC.MQOO_INPUT_AS_Q_DEF // open queue for input
+ MQC.MQOO_FAIL_IF_QUIESCING); // but not if MQM stopping
timer = Observable.Interval(TimeSpan.FromSeconds(5)).Subscribe(_ =>
{
lock (lockObj)
{
if (!isChecking)
{
isChecking = true;
var mqMsg = new MQMessage();
var mqGetMsgOpts = new MQGetMessageOptions {WaitInterval = 1};
// 15 second limit for waiting
mqGetMsgOpts.Options |= MQC.MQGMO_WAIT;
try
{
mqQueue.Get(mqMsg, mqGetMsgOpts);
if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
{
var text = mqMsg.ReadString(mqMsg.MessageLength);
System.Console.WriteLine(text);
Message message = new Message { Content = text };
foreach (var observer in observers)
observer.OnNext(message);
}
else
{
System.Console.WriteLine("Non-text message");
}
}
catch (MQException ex)
{
if ((ex.Message == "MQRC_NO_MSG_AVAILABLE"))
{
//nothing to do, emtpy queue
}
else
{
//log
}
}
finally
{
isChecking = false;
}
}
}
});
}
public IDisposable Subscribe(IObserver<Message> observer)
{
if (!observers.Contains(observer))
observers.Add(observer);
return new Unsubscriber(observers, observer);
}
public void Dispose()
{
((IDisposable)mqQueue)?.Dispose();
((IDisposable)queueManager)?.Dispose();
timer?.Dispose();
}
}
public class Unsubscriber : IDisposable
{
private readonly List<IObserver<Message>> _observers;
private readonly IObserver<Message> _observer;
public Unsubscriber(List<IObserver<Message>> observers, IObserver<Message> observer)
{
this._observers = observers;
this._observer = observer;
}
public void Dispose()
{
if (_observer != null) _observers.Remove(_observer);
}
}
这工作了将近一年,但现在有两件事需要修复,希望你能帮助我把它做好。
1)如果重启了IBMMQ,目前QueueMonitor没有收到新的传入消息,需要重启。
我应该如何处理?不知道Monitor端有没有重启IBM MQ。
2) 更复杂。我们正在迁移到一个新的平衡 IBMMQ 集群。它有 4 个配置为活动的活动节点。它们都在负载均衡器后面,所以当我在总线上放一条消息时,我将它发送到一个地址。
对于发送消息,这很简单。我遇到的问题是当我需要从队列中读取时。因为有 4 个不同的 IBMMQ 节点,有 4 个 IP。我怎么知道总线上已经发送了一条消息?我不能简单地听平衡器,因为它没有通知。我应该 ping 4 个节点吗?
平衡器是 netscaler。
提前致谢
解决方案
在 IBM MQ 集群中,您可以连接到集群中的任何队列管理器并推送消息,MQ 确保消息到达其目标队列,即使在执行 put 时您连接的队列管理器上没有本地定义该队列行动。当应用程序从队列执行获取消息操作时,您必须连接到队列管理器,该队列管理器在本地定义了目标队列以选择消息。因此,是的,在您的情况下,您必须连接到每个单独的 4 个队列管理器才能选择消息。
由于您正在迁移到新的 MQ 架构,我建议您查看具有 RDQM 概念的 IBM MQ v9.1 Advanced。RDQM(复制数据队列管理器)是一种高可用性解决方案,目前可在 Linux 平台上使用。
在链接下方找到: https ://www.ibm.com/support/knowledgecenter/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q130280_.htm
推荐阅读
- python - Face Plus Plus Python API 更改参数
- laravel - laravel 上的数据被检索了 2 次
- sql - AWS:Redshift 与传统 dbms
- c++ - Cuda 上的蒙特卡洛
- r - 较旧的 Mac 与较新的 Windows - R 真的无法分配足够的内存吗?
- c# - 正则表达式:匹配表达式周围最近的打开和关闭花括号内的所有内容
- javascript - Highcharts 同步图表与下拉选择而不是图例
- java - 通过 HTTP 请求发送大文件
- asp.net-mvc - ASP.NET MVC + Angular 6 中的一次调用主要,多次响应
- ruby - 使用 Time.now.between? 查看今天是否在两个日期期间之间