.net - 从 Azure 服务上的服务总线处理程序接收事件网格
问题描述
我正在事件网格上发送事件,例如。我可以看到他们到达 azure 仪表板
NomeEmailChange yay = new NomeEmailChange
{
Nome = "cesco",
Email = "cesco"
};
var primaryTopicKey = _config["EventGridConfig:AcessKey"];
var primaryTopic = _config["EventGridConfig:Endpoint"];
var primaryTopicHostname = new Uri(primaryTopic).Host;
var topicCredentials = new TopicCredentials(primaryTopicKey);
var client = new EventGridClient(topicCredentials);
var id = Guid.NewGuid().ToString();
var hey = new List<EventGridEvent>
{
new EventGridEvent()
{
Id = id,
EventType = "cesco-cesco",
Data = (yay),
EventTime = DateTime.Now,
Subject = "MS_Clientes",
DataVersion = "1.0",
}
};
;
client.PublishEventsAsync(primaryTopicHostname, hey);
然后我创建了一个事件网格订阅。我可以确认到达事件网格订阅的事件网格消息。
在另一个项目中,我订阅了如下所示的服务总线。它适用于消费直接发送到总线的消息。
public static IServiceCollection AddBus(this IServiceCollection services, IConfiguration configuration,
IHostingEnvironment env)
{
services.AddMassTransit(x => { x.AddConsumer<NomeEmailChangeConsumer>(); });
services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var keyName = "RootManageSharedAccessKey";
var busName = configuration["ServiceBus:Name"];
var secret = configuration["ServiceBus:Secret"];
var host = cfg.Host(
"Endpoint=sb://" + busName + ".servicebus.windows.net/;" +
"SharedAccessKeyName=" + keyName + ";" +
"SharedAccessKey=" + secret,
z =>
{
TokenProvider
.CreateSharedAccessSignatureTokenProvider(keyName, secret);
});
cfg.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
cfg.ReceiveEndpoint(host, configuration["ServiceBus:Topic"],
e => { e.Consumer<NomeEmailChangeConsumer>(provider); });
}));
services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddScoped(provider => provider.GetRequiredService<IBus>().CreateRequestClient<NomeEmailChange>());
services.AddSingleton<IHostedService, BusService>();
return services;
}
现在一切都应该工作,但是当消息到达时,在另一个项目上我收到以下错误
fail: MassTransit.Messages[0]
R-FAULT sb://sbacompanharreldev.servicebus.windows.net/bff-queue 9ade19ec-238c-4c08-8e03-28bac695ea7b No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
System.Runtime.Serialization.SerializationException: No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
at MassTransit.Serialization.SupportedMessageDeserializers.Deserialize(ReceiveContext receiveContext)
at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
解决方案
正如建议的那样,您必须编写自定义反序列化器。在我的实现中,我改变了一些关于如何处理 MassTransit 包的事情。
首先,您必须为不同的内容类型注册反序列化器:
- 例如,如果您的消息来自 MassTransit 发布者,它将是默认消息 ContentType(在 C# 中):
JsonMessageSerializer.JsonContentType
- 如果您的消息来自 ServiceBus 队列/主题,通常是
"application/json"
以下代码显示了如何注册反序列化器,以及如何设置接收端点。与您的方法相比,一个区别是我使用了连接字符串,因此不需要 SAS 令牌部分。
class Program
{
static string ContentTypeJson = "application/json";
static async Task Main(string[] args)
{
var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var queueName = "Your SB Queue Name";
var connectionString = "Connection String with RooTManage policy";
var host = cfg.Host(connectionString, h =>
{
h.OperationTimeout = TimeSpan.FromSeconds(60);
});
cfg.ReceiveEndpoint(queueName,
e =>
{
e.AddMessageDeserializer(contentType: new ContentType(ContentTypeJson), () =>
{
return new EventGridMessgeDeserializer(ContentTypeJson);
});
e.Consumer(() => new EventGridMessageConsumer());
// Uncomment if required deserializer for local messages - mass transit as publisher or direct messages from SB
//e.AddMessageDeserializer(contentType: JsonMessageSerializer.JsonContentType, () =>
//{
// return new CustomMessageDeserializer(JsonMessageSerializer.JsonContentType.ToString());
//});
//e.Consumer(() => new MessageConsumer());
});
});
bus.Start();
Console.WriteLine("Press any key to exit");
// for testing purposes of local messages - mass transit as publisher
// await bus.Publish<CustomMessage>(new { Hello = "Hello, World." });
await Task.Run(() => Console.ReadKey());
await bus.StopAsync();
}
我使用消息模拟器将消息传送到 EventGrid,EventGrid 将消息转发到 SB 队列,在下面您可以看到运行此代码的结果:
事件网格消息的反序列化器以及完整代码,您可以在 Github 上找到:https ://github.com/kgalic/MassTransitSample以及我对您其他问题的回答。
推荐阅读
- python - Tensorflow 2.0.0-alpha0: tf.logging.set_verbosity
- r - 在ggplot中使用aes的两种方式有什么区别?
- java - 访问以编程方式创建的 recyclerview 的子项(Chip)
- php - Laravel:我如何提交表单并保持在同一个视图中?
- kendo-ui - 如何扩大 Kendo UI 网格中的弹出窗口?
- prestashop - Prestashop 未在索引页面上定义
- html - 在 Xaringan 中创建图像网格
- java - JavaFx TextArea freezes when calling appendText() inside a loop
- swift - Swift Reverse Geocoding get name of state always in same language
- arrays - Python 通过“\”加入列表