azure - 将事件从 Azure 事件中心传送到 Azure 服务总线
问题描述
我正在收听各种事件的事件中心。
- 每个事件都是高价值的,不能错过。
- 事件根据设备 ID 进行分区。
- 来自一个设备 id 的事件是稀疏的并且不是很频繁(每隔几天就有几个事件)。它仅在响应不常见的用户操作时发生。
- 设备数量巨大,因此我会针对各种设备 Id 进行大量事件。
对于每个事件,我需要对不太可靠的系统进行 3-4 次 API 调用。由于其中一些是跨地理呼叫,因此可能需要一些时间。
我计划从事件中心获取事件并将它们放入服务总线。我的理由如下。
- 事件中心只能扩展到 32 个分区,如果一个事件需要时间,整个分区就会被阻塞。
- 另一方面,服务总线更具水平可扩展性。如果吞吐量下降,我可以向服务总线添加更多订阅者。
我一直在寻找这样的模式,但我还没有看到我们从基于日志的消息传递系统获取数据并将它们推送到基于队列的模式。
有没有更好的方法来处理这种情况?
解决方案
我认为您可以使用事件中心触发器和服务总线输出绑定来实现您想要的。
例如,我想监视事件中心“测试”并且我正在使用 C# 库:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace FunctionApp68
{
public static class Function1
{
[FunctionName("Function1")]
[return: ServiceBus("test1", Connection = "ServiceBusConnection")]
public static string Run([EventHubTrigger("test", Connection = "str")] EventData[] events, ILogger log)
{
var exceptions = new List<Exception>();
string messageBodyt = "";
foreach (EventData eventData in events)
{
try
{
string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
messageBodyt = messageBodyt + messageBody;
// Replace these two lines with your processing logic.
log.LogInformation($"C# Event Hub trigger function processed a message: {messageBody}");
//await Task.Yield();
}
catch (Exception e)
{
// We need to keep processing the rest of the batch - capture this exception and continue.
// Also, consider capturing details of the message that failed processing so it can be processed again later.
exceptions.Add(e);
}
}
// Once processing of the batch is complete, if any messages in the batch failed processing throw an exception so that there is a record of the failure.
if (exceptions.Count > 1)
throw new AggregateException(exceptions);
if (exceptions.Count == 1)
throw exceptions.Single();
return messageBodyt;
}
}
}
上面的代码将从事件中心“test”收集并保存到服务总线队列“test1”。
看看这些文档:
推荐阅读
- google-calendar-api - Google Calendar 支持的“calendarId”官方列表
- python - 如何使用python解析键值数据
- python - 将所有构造函数参数作为实例属性添加到 PyCharm 中的类
- reactjs - 对于 React 和 TypeScript,哪种方法是首选方法
- google-apps-script - 每当在谷歌表格中编辑另一个单元格时,是否可以使用简单的触发器来增加一个单元格?
- algorithm - 为什么深度优先搜索的空间复杂度不表示为 O(n)?
- c - 如何将结构传递给系统调用,然后将内核数据复制到该结构
- python - 使用 zappa 部署时如何使用自定义 lambda 名称而不是自动生成的名称?
- javascript - 如何将此 javascript 和 HTML 代码添加到 Angular 项目中?我可以从 javascript 函数中以角度呈现 html 吗?
- html - 为什么在打印时部分出现在屏幕上的不可见元素?