首页 > 解决方案 > 如何在 Orchestrator 中对大数据进行多线程处理?

问题描述

我在 Orchestrator 中有以下代码:

        var parallelTasks = new List<Task>();

        // Get Records
        List<Record> records = await context.CallActivityAsync<List<Record>>("GetRecords", orchestrationContext);

        // Write Records
        foreach (Record record in records)
        {
            parallelTasks.Add(context.CallActivityAsync<int>("WriteRecord", record));
        }

        await Task.WhenAll(parallelTasks);

这失败了,因为 GetRecords 返回了太多数据(60000 条记录),并且由于 CallActivityAsync 无法返回超过 8mb 的数据,Orchestrator 无法继续。

这也可能会失败,因为它实际上会尝试为每次写入启动 60000 个活动。

我这样做是为了让 Azure 使用多个线程写入 ADL。起初我尝试使用信号量,网上的多个消息来源告诉我不应该使用信号量,而是使用“CallActivityAsync”,这将允许 Azure 管理自己的线程。

我该如何解决这个问题并实现对 ADL 的多线程写入?

作为记录,我使用的库一次只能写入一个文件(我知道 MS 的新库包括批量写入,但由于不同的原因我无法使用它)。

标签: c#azureazure-functionsorchestrationazure-durable-functions

解决方案


是否有理由GetRecordsWriteRecord处于耐用功能设置中?如果没有,GetRecords可以将每个Record对象(序列化为 JSON)拖放到 Azure 队列/EventHub,而不是返回一个巨大的列表。然后WriteRecords可以从该 Queue/EventHub 触发以处理每条消息。


推荐阅读