.net-core - 在 ASP.net 核心 IHostedService 中使用 RabbitMQ
问题描述
我面临一个需要帮助的问题。我正在开发一个后台进程,它将监听 rabbitmq 服务器中的队列。如果我在 .net 核心控制台应用程序中运行它就可以了。但是,我想以一种更优雅的方式来完成它,例如 Web 服务(这给我带来了很多麻烦,因为它在安装时不起作用)或 IIS 托管的 Web 应用程序。当我尝试在 .net 核心 Web 应用程序中托管服务 (IHostedService) 时,我遇到了范围服务问题。
下面的代码在控制台应用程序中运行良好。如何使其在 .net 核心 Web 应用程序中作为 IHostedService 运行。我应该改变什么。感谢您的帮助。代码:
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using PaymentProcessor.Models;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Microsoft.EntityFrameworkCore;
namespace PaymentProcessor
{
public class PaymentProcessingService : HostedService
{
IConfiguration configuration;
private EntitiesContext claimsContext;
private string connectionString;
private string HostName = "";
private string UserName = "";
private string Password = "";
private static int MaxRetries;
private IConnectionFactory factory;
private IConnection connection;
private IModel channel;
public PaymentProcessingService(IConfiguration configuration)
{
this.configuration = configuration;
this.connectionString = configuration.GetConnectionString ("StagingContext");
claimsContext = new EntitiesContext(connectionString);
HostName = this.configuration.GetValue<string>("Settings:HostName");
UserName = this.configuration.GetValue<string>("Settings:UserName");
Password = this.configuration.GetValue<string>("Settings:Password");
MaxRetries = this.configuration.GetValue<string>("Settings:MaxRetries").ConvertTo<int>();
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
connect:
factory = new ConnectionFactory { HostName = HostName, UserName = UserName, Password = Password };
try
{
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ExchangeDeclare("payment_rocessing_exchange", "topic");
channel.QueueDeclare("payment_processing_queue", true, false, false, null);
channel.QueueBind("payment_processing_queue", "payment_processing_exchange", "processing");
var queueArgs = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "payment_processing_exchange" },
{"x-dead-letter-routing-key", "processing_retry"},
{ "x-message-ttl", 10000 }
};
channel.ExchangeDeclare("payment_rocessing_exchange", "topic");
channel.QueueDeclare("payment_processing_retry_queue", true, false, false, queueArgs);
channel.QueueBind("payment_processing_retry_queue", "payment_processing_exchange", "processing_retry", null);
channel.ExchangeDeclare("payment_processing_exchange", "topic");
channel.QueueDeclare("payment_processing_error_queue", true, false, false, null);
channel.QueueBind("payment_processing_error_queue", "payment_processing_exchange", "processing_error", null);
channel.ExchangeDeclare("payment_processing_exchange", "topic");
channel.QueueDeclare("payment_integration_queue", true, false, false, null);
channel.QueueBind("payment_integration_queue", "payment_processing_exchange", "integration", null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body.DeSerializeText();
try
{
var saveBundle = JObject.Parse(message);
var msg = (dynamic)((dynamic)saveBundle).Message;
string referenceNo = (string)msg.ReferenceNo;
var parameters = new[]
{
new SqlParameter
{
DbType = DbType.String,
ParameterName = "ReferenceNo",
Value =referenceNo
}
};
var result = claimsContext.Database.ExecuteSqlCommand("dbo.PaymentReferencesProcessSingle @ReferenceNo", parameters);
IBasicProperties props = channel.CreateBasicProperties();
props.Persistent = true;
props.ContentType = "text/plain";
props.DeliveryMode = 2;
channel.BasicPublish("payment_processing_exchange", "integration", props, (new MessageEnvelope { RetryCounts = 0, Message = JObject.FromObject(new { ReferenceNo = referenceNo }) }).Serialize());
}
catch (Exception ex)
{
MessageEnvelope envelope = JsonConvert.DeserializeObject<MessageEnvelope>(message);
if (envelope.RetryCounts < MaxRetries)
{
int RetryCounts = envelope.RetryCounts + 1;
MessageEnvelope messageEnvelope = new MessageEnvelope { RetryCounts = RetryCounts, Message = envelope.Message };
var data = messageEnvelope.Serialize();
channel.BasicPublish("payment_processing_exchange", "processing_retry", null, data);
}
else
{
var data = envelope.Serialize();
channel.BasicPublish("payment_processing_exchange", "processing_error", null, data);
}
}
finally
{
channel.BasicAck(ea.DeliveryTag, false);
}
};
channel.BasicConsume(queue: "payment_processing_queue", autoAck: false, consumer: consumer);
}
catch (Exception ex)
{
Thread.Sleep(10000);
goto connect;
}
}
}
}
接着
services.AddScoped<IHostedService, PaymentProcessingService>();
解决方案
正如 Dekim 提到的,应该注册一个服务。
请查看我在 GitHub 上创建的示例。
Program.cs 看起来像这样:
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Threading.Tasks;
namespace Core
{
internal class Program
{
static async Task Main(string[] args)
{
await new HostBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddHostedService<ServiceRabbitMQ>(); // register our service here
})
.RunConsoleAsync();
}
}
}
推荐阅读
- google-custom-search - 将 Google 自定义搜索布局从移动设备更改为桌面设备?
- advanced-custom-fields - wp_mail 在 acf/pre_save_post 或 acf/save_post 中不起作用
- mysql - 选择多个相似的行
- mongodb - 在 $sort 聚合管道之后向文档添加字段,其中使用 MongoDb 聚合将其索引包含在排序列表中
- javascript - 如何从 Web API 接收图像数据并将其设置为图像标签的 src?
- ios - 解决 Xcode 11 beta 5 中的 SwiftUI Path 崩溃问题
- typescript - 预定义参数形状到众所周知的返回类型的 Typescript 通用映射
- zurb-foundation - 如何防止基础手表添加html
东西到我的文件? - python - 无法读取 Javascript 文件 Electron JS 中未定义的属性“加入”
- angular - 如何使用 Kendo 上传 Angular 以使用进度指示器上传到 S3