首页 > 解决方案 > 数据未插入 Cosmos C#

问题描述

我有一个项目清单(其中 32007 个)

我正在批量添加它们

但是,似乎有些没有被插入

更奇怪的是,如果我从头开始多次运行我的流程(即重新创建我的收藏),创建的项目数量会有所不同,我见过 32005、32003

我已经将我的收藏扩大到有很多 RU(自动缩放)到 25000

我将数据分成 100 份

我的逻辑如下

public async Task<List<Account>> ProcessAccountsAsync(List<Account> accounts)
{
   var cosmosConnection = await ConnectToDatabaseAsync().ConfigureAwait(false);
   var failedAccounts = new List<Account>();

   var accountsToInsert = new Dictionary<PartitionKey, Stream>(accounts.Count);
   Parallel.ForEach(accounts, (account) =>
   {
       var stream = new MemoryStream();
       var json = JsonConvert.SerializeObject(account, JsonHelper.DefaultSettings());
       stream.Write(Encoding.Default.GetBytes(json));
       stream.Position = 0;
       accountsToInsert.Add(new PartitionKey(account.Id), stream);
   });

   var tasks = new List<Task>(accounts.Count);
   foreach (var account in accountsToInsert)
   {
       tasks.Add(cosmosConnection.Container.CreateItemStreamAsync(account.Value, account.Key)
            .ContinueWith((Task<ResponseMessage> task) =>
            {
                using (var response = task.Result)
                {
                    if (!response.IsSuccessStatusCode)
                    {
                        var actualAccount = accounts.FirstOrDefault(x => account.Key.ToString().Contains(x.Id));
                        Debug.WriteLine($"Processing Account : {actualAccount?.ArcContactId} Received {response.StatusCode} ({response.ErrorMessage}).");
                        failedAccounts.Add(actualAccount);
                    }
                }
            }));
        }

        await Task.WhenAll(tasks);

        return failedAccounts;
    }

在我的调用逻辑中,我重试所有失败的帐户

var tidiedJson = JsonConvert.SerializeObject(list, Formatting.Indented);
accounts = JsonConvert.DeserializeObject <List<DomainModels.Versions.v2.Account>>(tidiedJson);

var failedAccounts = await cosmosAccountRepository.ProcessAccountsAsync(accounts);

while (failedAccounts.Count > 0)
{
    failedAccounts = await cosmosAccountRepository.ProcessAccountsAsync(failedAccounts);
}

我不知道为什么没有插入帐户以及为什么行为如此随机!

我已经尝试过各种吞吐量和批次大小,没有区别

任何人都可以看到任何明显的东西吗?

如果做不到这一点,我有一个ID字段的帐户,有没有办法找出哪些帐户不在数据库中,而不必逐个遍历所有32007个帐户,这显然不是一个好计划!

保罗

标签: c#azureazure-cosmosdbazure-cosmosdb-sqlapi

解决方案


推荐阅读