c# - 如何使用 .NET Core 2.1 和 Stream API 在 Cosmos DB 中批量插入
问题描述
我正在尝试使用此 CosmosDB 示例实现批量插入。此示例使用 .NET Core 3.* 创建并支持 System.Text.Json。
使用 CreateItemAsync 方法时,它可以完美运行:
var concurrentTasks = new List<Task<ItemResponse<Notification>>>();
foreach (var entity in entities)
{
entity.Id = GenerateId(entity);
var requestOptions = new ItemRequestOptions();
requestOptions.EnableContentResponseOnWrite = false; // We don't need to get the entire body returend.
concurrentTasks.Add(Container.CreateItemAsync(entity, new PartitionKey(entity.UserId), requestOptions));
}
await Task.WhenAll(concurrentTasks);
但是,我试图通过将数据直接流式传输到 CosmosDB 来查看是否可以减少 RU 的数量,希望 CosmosDB 不会因反序列化 JSON 本身而向我收费。
我正在使用 .NET Core 2.1 和 Newtonsoft.Json。这是我的代码,它不返回成功的状态代码。响应标头中的子状态代码为“0”。
Notification[] notifications = entities.ToArray();
var itemsToInsert = new Dictionary<PartitionKey, Stream>();
foreach (var notification in notifications)
{
MemoryStream ms = new MemoryStream();
StreamWriter writer = new StreamWriter(ms);
JsonTextWriter jsonWriter = new JsonTextWriter(writer);
JsonSerializer ser = new JsonSerializer();
ser.Serialize(jsonWriter, notification);
await jsonWriter.FlushAsync();
await writer.FlushAsync();
itemsToInsert.Add(new PartitionKey(notification.UserId), ms);
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (KeyValuePair<PartitionKey, Stream> item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
response.StatusCode:BadRequest response.ErrorMessage:空
我假设我没有以正确的方式序列化到 Stream 中。有人有线索吗?
更新
我发现新的 System.Text.Json 包也实现了 .NET Standard 2.0,所以我从 NUget 安装了它。现在我可以从 Github 复制前面提到的示例代码。
Notification[] notifications = entities.ToArray();
var itemsToInsert = new List<Tuple<PartitionKey, Stream>>();
foreach (var notification in notifications)
{
notification.id = $"{notification.UserId}:{Guid.NewGuid()}";
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, notification);
itemsToInsert.Add(new Tuple<PartitionKey, Stream>(new PartitionKey(notification.RoleId), stream));
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (var item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Item2, item.Item1)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
我仔细检查了 BulkInsert 是否已启用(否则第一种方法也不起作用)。仍然有一个 BadRequest 和一个用于 errorMessage 的 NULL。
我还检查了数据是否没有添加到容器中,尽管有 BadRequest。
解决方案
我发现了问题。
我已经使用以下选项设置了我的 Cosmos Context:
var cosmosSerializationOptions = new CosmosSerializationOptions();
cosmosSerializationOptions.PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase;
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions();
cosmosClientOptions.SerializerOptions = cosmosSerializationOptions;
因此CamelCase
约定。在我的第一个(工作)代码示例中,我会让 CosmosDB 上下文反序列化为 JSON。他将使用此 CamelCase 约定进行序列化,因此我的 PartionKeyUserId
将被序列化为userId
.
但是,为了减少一些 RU,我将使用CreateItemStreamAsync
让我负责序列化的 RU。还有一个错误,我的财产被定义为:
public int UserId { get; set; }
所以他会被序列化为 json UserId: 1
。
但是,分区键定义为/userId
. 因此,如果我添加 JsonPropertyName 属性,它会起作用:
[JsonPropertyName("userId")]
public int UserId { get; set; }
...如果只有一条错误消息会告诉我。
使用这种CreateItemStream
方法可以节省大约 3% 的 RU。但是,随着时间的推移,我猜这会慢慢节省一些 RU。
推荐阅读
- java - 如何在 Java 中打印 char[]?
- javascript - 如何将数组从js传递到控制器
- python - 如何在变量下的 .txt 文件中打印字符串
- javascript - Javascript 新手制作小费计算器无法弄清楚它为什么不加载?
- python - tz_localize: KeyError: ('Asia/Singapore', u'occurrred at index 0')
- java - 方法被调用了 8 次,我不明白为什么?
- bash - 查找具有特定扩展名的文件
- javascript - 在 Next.js 中为 styled-jsx 动态添加样式
- sql-server - 无法使用 django-pyodbc-azure 2.1.0.0 连接到 MSSQL
- excel - 用户表单输入后如何自动向下滚动到新数据