c# - 如何在 Orchestrator 中对大数据进行多线程处理?
问题描述
我在 Orchestrator 中有以下代码:
var parallelTasks = new List<Task>();
// Get Records
List<Record> records = await context.CallActivityAsync<List<Record>>("GetRecords", orchestrationContext);
// Write Records
foreach (Record record in records)
{
parallelTasks.Add(context.CallActivityAsync<int>("WriteRecord", record));
}
await Task.WhenAll(parallelTasks);
这失败了,因为 GetRecords 返回了太多数据(60000 条记录),并且由于 CallActivityAsync 无法返回超过 8mb 的数据,Orchestrator 无法继续。
这也可能会失败,因为它实际上会尝试为每次写入启动 60000 个活动。
我这样做是为了让 Azure 使用多个线程写入 ADL。起初我尝试使用信号量,网上的多个消息来源告诉我不应该使用信号量,而是使用“CallActivityAsync”,这将允许 Azure 管理自己的线程。
我该如何解决这个问题并实现对 ADL 的多线程写入?
作为记录,我使用的库一次只能写入一个文件(我知道 MS 的新库包括批量写入,但由于不同的原因我无法使用它)。
解决方案
是否有理由GetRecords
并WriteRecord
处于耐用功能设置中?如果没有,GetRecords
可以将每个Record
对象(序列化为 JSON)拖放到 Azure 队列/EventHub,而不是返回一个巨大的列表。然后WriteRecords
可以从该 Queue/EventHub 触发以处理每条消息。
推荐阅读
- python - 如何在 python 中使用 pyplot 按数组绘制图形?
- swift - Swift - 如何使 StackView 100% 宽度
- sql - 从对偶中选择 regexp_replace('aaa_bbb', '(_.)', upper('\1'))
- javascript - 为什么不全局安装 webpack?
- python - 如何为 Google OR-Tools 工作坊示例添加截止日期?
- c++ - 将第一个派生类转换为第二个派生类 - 为什么会起作用?
- sql-server - SQL Server 2016 中的 STRING_AGG 替换
- hibernate - Weblogic 10 EJB 提交中的超时异常
- hyperledger-fabric - 组织之间的 TLS 握手失败
- java - .toNanos() 方法无法正常工作