首页 > 解决方案 > RABBITMQ C# 在控制台上运行良好,但不适用于服务

问题描述

我通过https://www.rabbitmq.com/dotnet-api-guide.html或同一网站上的教程学习如何使用来自 RabbitMQ 的消息。

它在控制台应用程序上运行良好!我在做什么?我只是使用消息并发布返回,以便队列中的消息完成,删除。没关系!现在我正在尝试创建一个多线程 Windows 服务来执行此操作,因为我需要一种更好更快的方法来消耗队列中的大量消息。

我发现很难找到创建服务的信息,所以我决定问你是否可以帮助我

首先有一个奇怪的情况,我无法理解。让我们看看下面消费消息的基本结构:

public static void Main()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using(var connection = factory.CreateConnection())
    using(var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false, arguments: null);

        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        Console.WriteLine(" [*] Waiting for messages.");

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] Received {0}", message);

            int dots = message.Split('.').Length - 1;
            Thread.Sleep(dots * 1000);

            Console.WriteLine(" [x] Done");

            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        channel.BasicConsume(queue: "task_queue",autoAck: false,consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
    =>> Console.ReadLine(); <<=
    }
}

看到“Console.ReadLine();”了吗?在控制台应用程序上,此命令可防止连接和通道配置!没有这个 RabbitMQ 会返回错误信息!(代码=200,文本=“再见”)!但没关系,我不会在控制台应用程序上删除它!但是我如何在 Windows 服务上使用它呢?

所以我在这里找到了一个解决方案,我可以提供一个服务来消费消息,但它只适用于 1 或 2 条消息。我有线程来消费消息,但是如果队列上有很多消息,消费者(线程)会尝试消费相同的消息,或者不接收任何消息,从而导致错误。我尝试了不同的方法来解决这个问题,但结果是一样的。我想做一个可以作为控制台应用程序运行良好的服务,但使用多线程方式更快。

我要做的是: 1 - 使用消息 2 - 使用消息中的数据执行某些操作 3 - 向队列发布关于该消息的返回,以便消息完成。

我的代码:

public partial class Service1 : ServiceBase
{
    public static ConnectionFactory factory;
    public static string fila = "QUEUE";
    public static int threads = 0;
    private Thread executeThread;


    public Service1()
    {
        InitializeComponent();
    }

    protected override void OnStart(string[] args)
    {
        factory = new ConnectionFactory();
        {
            System.Uri uri = new System.Uri("amqp://ARABBITMQNURL:8080");
            factory.Uri = uri;
        };

        try
        {
            executeThread = new Thread(new ThreadStart(start));
            executeThread.Start();
            File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n starting");

        }
        catch (Exception ex)
        {
            File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n starting error: " + ex.Message);
        }

    }

    private static void start()
    {
        while (true)
        {

            ThreadStart go = new ThreadStart(ServerRPC);
            Thread T = new Thread(go);
            T.Start();

            Thread.Sleep(1000);

        }
    }

    protected override void OnStop()
    {
        EventLog.WriteEntry("Ivi client stopped", EventLogEntryType.Warning);
    }

    private static void ServerRPC()
    {
        threads++;

        File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n"+ threads);

        IConnection connection = factory.CreateConnection();
        IModel channel = connection.CreateModel();

        channel.QueueDeclare(queue: fila, durable: true, exclusive: false, autoDelete: false, arguments: null);
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        var consumer = new EventingBasicConsumer(channel);

        ConsumoMSG.Consumer(fila, consumer, channel, connection);

        channel.BasicConsume(queue: fila, autoAck: false, consumer: consumer);

    }

}

public static void Consumer(string fila, EventingBasicConsumer consumer, IModel channel, IConnection connection)
    {
        channel.BasicConsume(queue: fila, autoAck: false, consumer: consumer);

        consumer.Received += (ch, ea) =>
        {
            var body = ea.Body;
            var props = ea.BasicProperties;
            var replyProps = channel.CreateBasicProperties();
            replyProps.CorrelationId = props.CorrelationId;
            var message = Encoding.UTF8.GetString(body);

            File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n " + message);

                Execute.Powershell(); //this start a ps1 that sleep for 5 sec.

                var response = "{\"code\":\"500\",\"response\":\"PowershellReturn\"}";

                var responseBytes = Encoding.UTF8.GetBytes(response);

                try
                {
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(ea.DeliveryTag, false);
                    File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n PUBLISH OK!! routingKey: " + props.ReplyTo + "basicProperties: " + replyProps);
                Service1.threads--;
                }
                catch (Exception e)
                {
                    File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n ERROR PUBLISH: " + e.Message);
                    File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n ERROR PUBLISH routingKey: " + props.ReplyTo + "basicProperties: " + replyProps);
                }
            channel.Close();
            connection.Close();

        };
    }
}

我可以用另一种方法来做到这一点吗?我做错了什么?

谢谢!

标签: c#multithreadingservicerabbitmq

解决方案


此代码用作 Windows 服务。它使用 TopShelf ( http://topshelf-project.com )。

请阅读第二行的评论。

class Program
    {
        //I declare as class fields so they are not garbage collected and the consumer can stay alive
        private EventingBasicConsumer _consumer;
        private IModel _channel;
        private IConnection _connection;

        public bool Stop()
        {
            _channel.Dispose();
            _connection.Dispose();
            return true;
        }

        public bool Start()
        {

            var factory = new ConnectionFactory() {HostName = "localhost"};
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();

            _channel.QueueDeclare(queue: "hello",
                durable: false,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            _consumer = new EventingBasicConsumer(_channel);
            _consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

            };
            _channel.BasicConsume(queue: "hello",
                autoAck: true,
                consumer: _consumer);

            return true;
        }


        public static void Main()
        {

            var rc = HostFactory.Run(x => //1
            {
                x.Service<Program>(s => //2
                {
                    s.ConstructUsing(name => new Program()); //3
                    s.WhenStarted(tc => tc.Start()); //4
                    s.WhenStopped(tc => tc.Stop()); //5
                });
                x.RunAsLocalSystem(); //6

                x.SetDescription("Sample Topshelf Host"); //7
                x.SetDisplayName("Stuff"); //8
                x.SetServiceName("Stuff"); //9
            }); //10

            var exitCode = (int) Convert.ChangeType(rc, rc.GetTypeCode()); //11
            Environment.ExitCode = exitCode;
        }
    }

推荐阅读