azure - Cosmos 插入不会有效地并行化
问题描述
元问题:
我们正在从 EventHub 中提取数据,运行一些逻辑,并将其保存到 cosmos。目前 Cosmos 插入是我们的瓶颈。我们如何最大化我们的吞吐量?
详细信息
我们正在尝试优化我们的 Cosmos 吞吐量,并且 SDK 中似乎存在一些争用,这使得并行插入仅比串行插入快一点。
我们在逻辑上这样做:
for (int i = 0; i < insertCount; i++)
{
taskList.Add(InsertCosmos(sdkContainerClient));
}
var parallelTimes = await Task.WhenAll(taskList);
这是比较串行插入、并行插入和“伪造”插入(使用 Task.Delay)的结果:
Serial took: 461ms for 20
- Individual times 28,8,117,19,14,11,10,12,5,8,9,11,18,15,79,23,14,16,14,13
Cosmos Parallel
Parallel took: 231ms for 20
- Individual times 17,15,23,39,45,52,72,74,80,91,96,98,108,117,123,128,139,146,147,145
Just Parallel (no cosmos)
Parallel took: 27ms for 20
- Individual times 27,26,26,26,26,26,26,25,25,25,25,25,25,24,24,24,23,23,23,23
- 序列很明显(只需添加每个值)
- no cosmos(最后一次计时)也很明显(只取最小时间)
- 但是平行宇宙几乎没有平行化,这表明存在一些争论。
我们在 Azure 中的 VM(与 Cosmos 相同的数据中心)上运行它,有足够的 RU,所以没有得到 429,并使用 Microsoft.Azure.Cosmos 3.2.0。
完整代码示例
class Program
{
public static void Main(string[] args)
{
CosmosWriteTest().Wait();
}
public static async Task CosmosWriteTest()
{
var cosmosClient = new CosmosClient("todo", new CosmosClientOptions { ConnectionMode = ConnectionMode.Direct });
var database = cosmosClient.GetDatabase("<ourcontainer>");
var sdkContainerClient = database.GetContainer("<ourcontainer>");
int insertCount = 25;
//Warmup
await sdkContainerClient.CreateItemAsync(new TestObject());
//---Serially inserts into Cosmos---
List<long> serialTimes = new List<long>();
var serialTimer = Stopwatch.StartNew();
Console.WriteLine("Cosmos Serial");
for (int i = 0; i < insertCount; i++)
{
serialTimes.Add(await InsertCosmos(sdkContainerClient));
}
serialTimer.Stop();
Console.WriteLine($"Serial took: {serialTimer.ElapsedMilliseconds}ms for {insertCount}");
Console.WriteLine($" - Individual times {string.Join(",", serialTimes)}");
//---Parallel inserts into Cosmos---
Console.WriteLine(Environment.NewLine + "Cosmos Parallel");
var parallelTimer = Stopwatch.StartNew();
var taskList = new List<Task<long>>();
for (int i = 0; i < insertCount; i++)
{
taskList.Add(InsertCosmos(sdkContainerClient));
}
var parallelTimes = await Task.WhenAll(taskList);
parallelTimer.Stop();
Console.WriteLine($"Parallel took: {parallelTimer.ElapsedMilliseconds}ms for {insertCount}");
Console.WriteLine($" - Individual times {string.Join(",", parallelTimes)}");
//---Testing parallelism minus cosmos---
Console.WriteLine(Environment.NewLine + "Just Parallel (no cosmos)");
var justParallelTimer = Stopwatch.StartNew();
var noCosmosTaskList = new List<Task<long>>();
for (int i = 0; i < insertCount; i++)
{
noCosmosTaskList.Add(InsertCosmos(sdkContainerClient, true));
}
var justParallelTimes = await Task.WhenAll(noCosmosTaskList);
justParallelTimer.Stop();
Console.WriteLine($"Parallel took: {justParallelTimer.ElapsedMilliseconds}ms for {insertCount}");
Console.WriteLine($" - Individual times {string.Join(",", justParallelTimes)}");
}
//inserts
private static async Task<long> InsertCosmos(Container sdkContainerClient, bool justDelay = false)
{
var timer = Stopwatch.StartNew();
if (!justDelay)
await sdkContainerClient.CreateItemAsync(new TestObject());
else
await Task.Delay(20);
timer.Stop();
return timer.ElapsedMilliseconds;
}
//Test object to save to Cosmos
public class TestObject
{
public string id { get; set; } = Guid.NewGuid().ToString();
public string pKey { get; set; } = Guid.NewGuid().ToString();
public string Field1 { get; set; } = "Testing this field";
public double Number { get; set; } = 12345;
}
}
解决方案
这是引入 Bulk 的场景。批量模式目前处于预览状态,可在3.2.0-preview2包中使用。
要利用 Bulk,您需要做的是打开AllowBulkExecution
标志:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true } );
制作此模式是为了使您描述的这种情况受益,即需要吞吐量的并发操作列表。
我们在这里有一个示例项目:https ://github.com/Azure/azure-cosmos-dotnet-v3/tree/master/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport
我们仍在处理官方文档,但想法是当并发操作发出时,而不是像您现在看到的那样将它们作为单独的请求执行,SDK 将根据分区亲和性对它们进行分组并将它们作为分组执行(批处理)操作,减少后端服务调用,并可能增加 50%-100% 之间的吞吐量,具体取决于操作量。此模式将消耗更多的 RU/s,因为它每秒推送的操作量比单独发出操作量大(因此,如果您达到 429 秒,则意味着瓶颈现在在预配的 RU/s 上)。
var cosmosClient = new CosmosClient("todo", new CosmosClientOptions { AllowBulkExecution = true });
var database = cosmosClient.GetDatabase("<ourcontainer>");
var sdkContainerClient = database.GetContainer("<ourcontainer>");
//The more operations the better, just 25 might not yield a great difference vs non bulk
int insertCount = 10000;
//Don't do any warmup
List<Task> operations = new List<Tasks>();
var timer = Stopwatch.StartNew();
for (int i = 0; i < insertCount; i++)
{
operations.Add(sdkContainerClient.CreateItemAsync(new TestObject()));
}
await Task.WhenAll(operations);
serialTimer.Stop();
重要提示:此功能仍处于预览阶段。由于这是针对吞吐量(而非延迟)优化的模式,因此您执行的任何单个操作都不会有很大的操作延迟。
如果您想进一步优化,并且您的数据源允许您访问 Streams(避免序列化),您可以使用CreateItemStream
SDK 方法获得更好的吞吐量。
推荐阅读
- facebook - Facebook Marketing Api 错误:(#2635) 您正在调用已弃用的广告 API 版本。请更新到最新版本:v3.2。”
- android - 自动点击提交按钮以及如何通过提交按钮的时间限制?
- angularjs - LocalStroage:计算本地存储中的项目数并使用 AngularJS 在 UI 中显示
- java - 来自hackerrank的特殊回文
- excel - 检查文件夹并比较 txt 文件的确切列表。还有一个带有文件夹选择器的公共变量,我无法在 msgbox 中显示
- reactjs - 如何在 LineChart 上正确获取 xAxis?
- xml - 如何将 DITA xml 转换为 CQ xml?
- c++ - 快速排序函数 C++ 1 参数 - 向量
- ruby-on-rails - 嵌套分组 ActiveRecord 的代码建议 | 导轨 5.2
- vb.net - 将数据从 System.Windows.Forms WebBrowser 传递到单独的网页