首页 > 解决方案 > Cosmos 插入不会有效地并行化

问题描述

元问题:
我们正在从 EventHub 中提取数据,运行一些逻辑,并将其保存到 cosmos。目前 Cosmos 插入是我们的瓶颈。我们如何最大化我们的吞吐量?

详细信息
我们正在尝试优化我们的 Cosmos 吞吐量,并且 SDK 中似乎存在一些争用,这使得并行插入仅比串行插入快一点。
我们在逻辑上这样做:

            for (int i = 0; i < insertCount; i++)
            {
                taskList.Add(InsertCosmos(sdkContainerClient));
            }
            var parallelTimes = await Task.WhenAll(taskList);

这是比较串行插入、并行插入和“伪造”插入(使用 Task.Delay)的结果:

Serial took: 461ms for 20
 - Individual times 28,8,117,19,14,11,10,12,5,8,9,11,18,15,79,23,14,16,14,13

Cosmos Parallel
Parallel took: 231ms for 20
 - Individual times 17,15,23,39,45,52,72,74,80,91,96,98,108,117,123,128,139,146,147,145

Just Parallel (no cosmos)
Parallel took: 27ms for 20
 - Individual times 27,26,26,26,26,26,26,25,25,25,25,25,25,24,24,24,23,23,23,23

我们在 Azure 中的 VM(与 Cosmos 相同的数据中心)上运行它,有足够的 RU,所以没有得到 429,并使用 Microsoft.Azure.Cosmos 3.2.0。

完整代码示例

    class Program
    {
        public static void Main(string[] args)
        {
            CosmosWriteTest().Wait();
        }

        public static async Task CosmosWriteTest()
        {
            var cosmosClient = new CosmosClient("todo", new CosmosClientOptions { ConnectionMode = ConnectionMode.Direct });
            var database = cosmosClient.GetDatabase("<ourcontainer>");
            var sdkContainerClient = database.GetContainer("<ourcontainer>");
            int insertCount = 25;
            //Warmup
            await sdkContainerClient.CreateItemAsync(new TestObject());

            //---Serially inserts into Cosmos---
            List<long> serialTimes = new List<long>();
            var serialTimer = Stopwatch.StartNew();
            Console.WriteLine("Cosmos Serial");
            for (int i = 0; i < insertCount; i++)
            {
                serialTimes.Add(await InsertCosmos(sdkContainerClient));
            }
            serialTimer.Stop();
            Console.WriteLine($"Serial took: {serialTimer.ElapsedMilliseconds}ms for {insertCount}");
            Console.WriteLine($" - Individual times {string.Join(",", serialTimes)}");

            //---Parallel inserts into Cosmos---
            Console.WriteLine(Environment.NewLine + "Cosmos Parallel");
            var parallelTimer = Stopwatch.StartNew();
            var taskList = new List<Task<long>>();
            for (int i = 0; i < insertCount; i++)
            {
                taskList.Add(InsertCosmos(sdkContainerClient));
            }
            var parallelTimes = await Task.WhenAll(taskList);

            parallelTimer.Stop();
            Console.WriteLine($"Parallel took: {parallelTimer.ElapsedMilliseconds}ms for {insertCount}");
            Console.WriteLine($" - Individual times {string.Join(",", parallelTimes)}");

            //---Testing parallelism minus cosmos---
            Console.WriteLine(Environment.NewLine + "Just Parallel (no cosmos)");
            var justParallelTimer = Stopwatch.StartNew();
            var noCosmosTaskList = new List<Task<long>>();
            for (int i = 0; i < insertCount; i++)
            {
                noCosmosTaskList.Add(InsertCosmos(sdkContainerClient, true));
            }
            var justParallelTimes = await Task.WhenAll(noCosmosTaskList);

            justParallelTimer.Stop();
            Console.WriteLine($"Parallel took: {justParallelTimer.ElapsedMilliseconds}ms for {insertCount}");
            Console.WriteLine($" - Individual times {string.Join(",", justParallelTimes)}");
        }

        //inserts 
        private static async Task<long> InsertCosmos(Container sdkContainerClient, bool justDelay = false)
        {
            var timer = Stopwatch.StartNew();
            if (!justDelay)
                await sdkContainerClient.CreateItemAsync(new TestObject());
            else
                await Task.Delay(20);

            timer.Stop();
            return timer.ElapsedMilliseconds;
        }

        //Test object to save to Cosmos
        public class TestObject
        {
            public string id { get; set; } = Guid.NewGuid().ToString();
            public string pKey { get; set; } = Guid.NewGuid().ToString();
            public string Field1 { get; set; } = "Testing this field";
            public double Number { get; set; } = 12345;
        }
    }

标签: azureazure-cosmosdb

解决方案


这是引入 Bulk 的场景。批量模式目前处于预览状态,可在3.2.0-preview2包中使用。

要利用 Bulk,您需要做的是打开AllowBulkExecution标志:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true } );

制作此模式是为了使您描述的这种情况受益,即需要吞吐量的并发操作列表。

我们在这里有一个示例项目:https ://github.com/Azure/azure-cosmos-dotnet-v3/tree/master/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport

我们仍在处理官方文档,但想法是当并发操作发出时,而不是像您现在看到的那样将它们作为单独的请求执行,SDK 将根据分区亲和性对它们进行分组并将它们作为分组执行(批处理)操作,减少后端服务调用,并可能增加 50%-100% 之间的吞吐量,具体取决于操作量。此模式将消耗更多的 RU/s,因为它每秒推送的操作量比单独发出操作量大(因此,如果您达到 429 秒,则意味着瓶颈现在在预配的 RU/s 上)。

var cosmosClient = new CosmosClient("todo", new CosmosClientOptions { AllowBulkExecution = true });
var database = cosmosClient.GetDatabase("<ourcontainer>");
var sdkContainerClient = database.GetContainer("<ourcontainer>");
//The more operations the better, just 25 might not yield a great difference vs non bulk
int insertCount = 10000;
//Don't do any warmup

List<Task> operations = new List<Tasks>();
var timer = Stopwatch.StartNew();
for (int i = 0; i < insertCount; i++)
{
    operations.Add(sdkContainerClient.CreateItemAsync(new TestObject()));
}

await Task.WhenAll(operations);
serialTimer.Stop();

重要提示:此功能仍处于预览阶段。由于这是针对吞吐量(而非延迟)优化的模式,因此您执行的任何单个操作都不会有很大的操作延迟。

如果您想进一步优化,并且您的数据源允许您访问 Streams(避免序列化),您可以使用CreateItemStreamSDK 方法获得更好的吞吐量。


推荐阅读