activemq - removing particular messages from an ActiveMQ queue by consuming with a selector fails to remove all that should match the selector
问题描述
I have written a C# utility to consume/delete certain messages in an activeMq queue. This utility fails to remove all of the messages which match the selector. Is there a problem with ActiveMQ, or (more likely) I am doing something incorrectly? A similar utility running on Solace does not manifest the same issue.
The message producer adds an identifying property to the msg; the utility will consume messages where the property has a certain value by using a selector. The messages being deleted are those for application entities that users have decided to cancel.
When the number of messages in the queue is small, e.g. 100, the queue draining utility works as planned. When the number of messages is large, e.g. 10000, the utility will remove/drain the first 50ish of those messages that should be removed, leaving a large number that should have been drained but have not been.
The two code samples below are taken from linqpad scripts that demonstrate the issue. The first creates 10000 messages, tagged randomly with one of 10 strings ('aaaaaaaa' to 'jjjjjjjj'). The second code block attempts to remove those messages with tagged with 'aaaaaaaa'. The first block will generate around 1000 'aaaaaaaa' messages, but the second block will only drain around 50.
(I am using ActiveMq version: 5.16.3 on windows, the linqpad scripts reference Apache.NMS.ActiveMQ.NetCore version 1.7.3 running on .Net 5.0)
producer.linq
string queueName = "queue.1";
var rnd = new System.Random();
var tags = new[] {
"aaaaaaaa", "bbbbbbbb", "cccccccc", "dddddddd", "eeeeeeee",
"ffffffff", "gggggggg", "hhhhhhhh", "iiiiiiii", "jjjjjjjj"};
Uri uri = new Uri("activemq:tcp://localhost:61616");
var connectionFactory = new Apache.NMS.ActiveMQ.ConnectionFactory(uri);
using IConnection connection = connectionFactory.CreateConnection();
using ISession session = connection.CreateSession( AcknowledgementMode.AutoAcknowledge);
using IDestination destination = session.GetQueue(queueName);
connection.Start();
using IMessageProducer producer = session.CreateProducer(destination);
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
producer.Priority = MsgPriority.Normal;
producer.RequestTimeout = TimeSpan.FromSeconds(1000.0);
int numDrainTargets = 0;
int numMsgsToSend = 10000;
for( int ctr = 0; ctr < numMsgsToSend; ++ctr)
{
var msg = producer.CreateTextMessage();
msg.Text = $"msg: {ctr,4}";
int tagIndex = rnd.Next(tags.Length);
if (tagIndex == 0)
++numDrainTargets;
msg.Properties.SetString("MyKey", tags[tagIndex]);
producer.Send(msg);
}
numDrainTargets.Dump("numDrainTargets");
"exiting producer".Dump();
drainer.linq
string queueName = "queue.1";
Uri uri = new Uri("activemq:tcp://localhost:61616");
IConnectionFactory factory = new Apache.NMS.ActiveMQ.ConnectionFactory(uri);
using IConnection connection = factory.CreateConnection();
using ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
using IDestination destination = session.GetQueue(queueName);
connection.Start();
string selector = $"MyKey = 'aaaaaaaa'";
using IMessageConsumer consumer = session.CreateConsumer(destination, selector);
var waitTime = TimeSpan.FromSeconds(0.1);
int drainedCtr = 0;
IMessage msg;
while((msg = consumer.Receive(waitTime)) != null)
{
++drainedCtr;
}
drainedCtr.Dump("num drained");
解决方案
推荐阅读
- java - 我应该抛出什么异常
- sql-server - SQL Not Exists 包含空值
- flutter - Flutter - issue with downloading the file and opening it with the default app on ios
- html - @font-face - 为一种字体使用多个文件
- node.js - How to use the container name in rancher inside the application code running within the container
- c++ - 为什么在 for 循环中在相同条件下会得到不同的结果?
- javascript - chrome Network emulateNetworkConditions
- python - 文件写入功能上的权限被拒绝错误
- jasper-reports - jasper中没有显示上述组件时,如何使用positionType将组件拉起?
- ios - Xcode @IBDesignable 按钮背景颜色未在情节提要中呈现