首页 > 解决方案 > 如何使用 .NET Core 2.1 和 Stream API 在 Cosmos DB 中批量插入

问题描述

我正在尝试使用此 CosmosDB 示例实现批量插入。此示例使用 .NET Core 3.* 创建并支持 System.Text.Json。

使用 CreateItemAsync 方法时,它可以完美运行:

    var concurrentTasks = new List<Task<ItemResponse<Notification>>>();
    foreach (var entity in entities)
    {
        entity.Id = GenerateId(entity);

        var requestOptions = new ItemRequestOptions();
        requestOptions.EnableContentResponseOnWrite = false; // We don't need to get the entire body returend.
        concurrentTasks.Add(Container.CreateItemAsync(entity, new PartitionKey(entity.UserId), requestOptions));
    }

    await Task.WhenAll(concurrentTasks);

但是,我试图通过将数据直接流式传输到 CosmosDB 来查看是否可以减少 RU 的数量,希望 CosmosDB 不会因反序列化 JSON 本身而向我收费。

我正在使用 .NET Core 2.1 和 Newtonsoft.Json。这是我的代码,它不返回成功的状态代码。响应标头中的子状态代码为“0”。

    Notification[] notifications = entities.ToArray();
    var itemsToInsert = new Dictionary<PartitionKey, Stream>();

    foreach (var notification in notifications)
    {
        MemoryStream ms = new MemoryStream();
        StreamWriter writer = new StreamWriter(ms);
        JsonTextWriter jsonWriter = new JsonTextWriter(writer);
        JsonSerializer ser = new JsonSerializer();
                
        ser.Serialize(jsonWriter, notification);

        await jsonWriter.FlushAsync();
        await writer.FlushAsync();

        itemsToInsert.Add(new PartitionKey(notification.UserId), ms);
    }

    List<Task> tasks = new List<Task>(notifications.Length);
    foreach (KeyValuePair<PartitionKey, Stream> item in itemsToInsert)
    {
        tasks.Add(Container.CreateItemStreamAsync(item.Value, item.Key)
            .ContinueWith((Task<ResponseMessage> task) =>
            {
                using (ResponseMessage response = task.Result)
                {
                    if (!response.IsSuccessStatusCode)
                    {
                        Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
                    }
                    else
                    {
                    }
                }
            }));
    }

    // Wait until all are done
    await Task.WhenAll(tasks);

response.StatusCode:BadRequest response.ErrorMessage:空

我假设我没有以正确的方式序列化到 Stream 中。有人有线索吗?

更新

我发现新的 System.Text.Json 包也实现了 .NET Standard 2.0,所以我从 NUget 安装了它。现在我可以从 Github 复制前面提到的示例代码。

        Notification[] notifications = entities.ToArray();
        var itemsToInsert = new List<Tuple<PartitionKey, Stream>>();

        foreach (var notification in notifications)
        {
            notification.id = $"{notification.UserId}:{Guid.NewGuid()}";

            MemoryStream stream = new MemoryStream();
            await JsonSerializer.SerializeAsync(stream, notification);

            itemsToInsert.Add(new Tuple<PartitionKey, Stream>(new PartitionKey(notification.RoleId), stream));
        }

        List<Task> tasks = new List<Task>(notifications.Length);
        foreach (var item in itemsToInsert)
        {
            tasks.Add(Container.CreateItemStreamAsync(item.Item2, item.Item1)
                .ContinueWith((Task<ResponseMessage> task) =>
                {
                    using (ResponseMessage response = task.Result)
                    {
                        if (!response.IsSuccessStatusCode)
                        {
                            Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
                        }
                        else
                        {
                        }
                    }
                }));
        }

        // Wait until all are done
        await Task.WhenAll(tasks);

我仔细检查了 BulkInsert 是否已启用(否则第一种方法也不起作用)。仍然有一个 BadRequest 和一个用于 errorMessage 的 NULL。

我还检查了数据是否没有添加到容器中,尽管有 BadRequest。

标签: c#azure-cosmosdbbulkinsertasp.net-core-2.1azure-cosmosdb-sqlapi

解决方案


我发现了问题。

我已经使用以下选项设置了我的 Cosmos Context:

var cosmosSerializationOptions = new CosmosSerializationOptions();
cosmosSerializationOptions.PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase;

CosmosClientOptions cosmosClientOptions = new CosmosClientOptions();
cosmosClientOptions.SerializerOptions = cosmosSerializationOptions;

因此CamelCase约定。在我的第一个(工作)代码示例中,我会让 CosmosDB 上下文反序列化为 JSON。他将使用此 CamelCase 约定进行序列化,因此我的 PartionKeyUserId将被序列化为userId.

但是,为了减少一些 RU,我将使用CreateItemStreamAsync让我负责序列化的 RU。还有一个错误,我的财产被定义为:

public int UserId { get; set; }

所以他会被序列化为 json UserId: 1

但是,分区键定义为/userId. 因此,如果我添加 JsonPropertyName 属性,它会起作用:

[JsonPropertyName("userId")]
public int UserId { get; set; } 

...如果只有一条错误消息会告诉我。

使用这种CreateItemStream方法可以节省大约 3% 的 RU。但是,随着时间的推移,我猜这会慢慢节省一些 RU。


推荐阅读