c# - RABBITMQ C# 在控制台上运行良好,但不适用于服务
问题描述
我通过https://www.rabbitmq.com/dotnet-api-guide.html或同一网站上的教程学习如何使用来自 RabbitMQ 的消息。
它在控制台应用程序上运行良好!我在做什么?我只是使用消息并发布返回,以便队列中的消息完成,删除。没关系!现在我正在尝试创建一个多线程 Windows 服务来执行此操作,因为我需要一种更好更快的方法来消耗队列中的大量消息。
我发现很难找到创建服务的信息,所以我决定问你是否可以帮助我
首先有一个奇怪的情况,我无法理解。让我们看看下面消费消息的基本结构:
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",autoAck: false,consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
=>> Console.ReadLine(); <<=
}
}
看到“Console.ReadLine();”了吗?在控制台应用程序上,此命令可防止连接和通道配置!没有这个 RabbitMQ 会返回错误信息!(代码=200,文本=“再见”)!但没关系,我不会在控制台应用程序上删除它!但是我如何在 Windows 服务上使用它呢?
所以我在这里找到了一个解决方案,我可以提供一个服务来消费消息,但它只适用于 1 或 2 条消息。我有线程来消费消息,但是如果队列上有很多消息,消费者(线程)会尝试消费相同的消息,或者不接收任何消息,从而导致错误。我尝试了不同的方法来解决这个问题,但结果是一样的。我想做一个可以作为控制台应用程序运行良好的服务,但使用多线程方式更快。
我要做的是: 1 - 使用消息 2 - 使用消息中的数据执行某些操作 3 - 向队列发布关于该消息的返回,以便消息完成。
我的代码:
public partial class Service1 : ServiceBase
{
public static ConnectionFactory factory;
public static string fila = "QUEUE";
public static int threads = 0;
private Thread executeThread;
public Service1()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
factory = new ConnectionFactory();
{
System.Uri uri = new System.Uri("amqp://ARABBITMQNURL:8080");
factory.Uri = uri;
};
try
{
executeThread = new Thread(new ThreadStart(start));
executeThread.Start();
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n starting");
}
catch (Exception ex)
{
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n starting error: " + ex.Message);
}
}
private static void start()
{
while (true)
{
ThreadStart go = new ThreadStart(ServerRPC);
Thread T = new Thread(go);
T.Start();
Thread.Sleep(1000);
}
}
protected override void OnStop()
{
EventLog.WriteEntry("Ivi client stopped", EventLogEntryType.Warning);
}
private static void ServerRPC()
{
threads++;
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n"+ threads);
IConnection connection = factory.CreateConnection();
IModel channel = connection.CreateModel();
channel.QueueDeclare(queue: fila, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
ConsumoMSG.Consumer(fila, consumer, channel, connection);
channel.BasicConsume(queue: fila, autoAck: false, consumer: consumer);
}
}
public static void Consumer(string fila, EventingBasicConsumer consumer, IModel channel, IConnection connection)
{
channel.BasicConsume(queue: fila, autoAck: false, consumer: consumer);
consumer.Received += (ch, ea) =>
{
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
var message = Encoding.UTF8.GetString(body);
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n " + message);
Execute.Powershell(); //this start a ps1 that sleep for 5 sec.
var response = "{\"code\":\"500\",\"response\":\"PowershellReturn\"}";
var responseBytes = Encoding.UTF8.GetBytes(response);
try
{
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
channel.BasicAck(ea.DeliveryTag, false);
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n PUBLISH OK!! routingKey: " + props.ReplyTo + "basicProperties: " + replyProps);
Service1.threads--;
}
catch (Exception e)
{
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n ERROR PUBLISH: " + e.Message);
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n ERROR PUBLISH routingKey: " + props.ReplyTo + "basicProperties: " + replyProps);
}
channel.Close();
connection.Close();
};
}
}
我可以用另一种方法来做到这一点吗?我做错了什么?
谢谢!
解决方案
此代码用作 Windows 服务。它使用 TopShelf ( http://topshelf-project.com )。
请阅读第二行的评论。
class Program
{
//I declare as class fields so they are not garbage collected and the consumer can stay alive
private EventingBasicConsumer _consumer;
private IModel _channel;
private IConnection _connection;
public bool Stop()
{
_channel.Dispose();
_connection.Dispose();
return true;
}
public bool Start()
{
var factory = new ConnectionFactory() {HostName = "localhost"};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
_consumer = new EventingBasicConsumer(_channel);
_consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
};
_channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: _consumer);
return true;
}
public static void Main()
{
var rc = HostFactory.Run(x => //1
{
x.Service<Program>(s => //2
{
s.ConstructUsing(name => new Program()); //3
s.WhenStarted(tc => tc.Start()); //4
s.WhenStopped(tc => tc.Stop()); //5
});
x.RunAsLocalSystem(); //6
x.SetDescription("Sample Topshelf Host"); //7
x.SetDisplayName("Stuff"); //8
x.SetServiceName("Stuff"); //9
}); //10
var exitCode = (int) Convert.ChangeType(rc, rc.GetTypeCode()); //11
Environment.ExitCode = exitCode;
}
}
推荐阅读
- android - 为什么我不能从最低 api 级别下拉列表中选择最低 android api 级别 15(统一 2017 或更高版本)
- python - python - 如何使用 X=datetime 列和 Y=column distinct count 字符串创建堆叠直方图(或条形图)
- javascript - 移动模式下的文本无法自我修复
- javascript - Gremlin 在 javascript AWS Lambda 中获得计数
- javascript - JavaScript 引导程序不起作用
- java - 如何在 pl/pgsql 中创建返回 refcursor 和 totalRow 的函数/过程?
- javascript - 纠正这个for循环?
- android - 重新映射设备电源按钮以显示对话框
- node.js - 如何按顺序解构多个查询结果?
- c# - 是否可以在单个 API 调用中从 Azure Blob 存储中检索多个 Blob?