首页 > 解决方案 > 找不到 Azure Cosmos BulkExecuter 方法

问题描述

我正在使用Microsoft.Azure.DocumentDb v2.1.3 和Microsoft.Azure.CosmosDb.BulkExecuter v1.4.0,当我运行代码时:

public async Task CreateMultipleAsync(IEnumerable<JObject> models)
{
    var collectionLink = UriFactory.CreateDocumentCollectionUri(_databaseName, _collectionName);
    var collection = await _client.ReadDocumentCollectionAsync(collectionLink);
    var bulkExecutor = new BulkExecutor(_client, collection);

    try
    {
        await bulkExecutor.InitializeAsync();
        var response = await bulkExecutor.BulkImportAsync(models, true);
    } catch (Exception ex)
    {

    }
}

我收到此错误:

找不到方法:'System.Threading.Tasks.Task`1 Microsoft.Azure.Documents.Routing.PartitionKeyRangeCache.TryLookupAsync(System.String, Microsoft.Azure.Documents.Routing.CollectionRoutingMap, System.Threading.CancellationToken, Boolean)'。

我见过一些人遇到这个问题(它似乎不断出现),他们建议分别使用 v2.1.3 和 v1.4.0,我已经在这样做了。

我正在使用.net 4.6.2。有谁知道为什么会这样

标签: c#azureazure-cosmosdb

解决方案


@r3plica,

这是 BulkExecutor 的 1.1.2 或更高版本已修复的旧问题。

我尝试使用Github Repo来运行 BulkExecutor 的 BulkImportAsync 方法来重现您的问题,对我来说效果很好。

我使用下面的方法来创建分区集合-

static internal async Task<DocumentCollection> CreatePartitionedCollectionAsync(DocumentClient client, string databaseName,
            string collectionName, int collectionThroughput)
        {
            PartitionKeyDefinition partitionKey = new PartitionKeyDefinition
            {
                Paths = new Collection<string> { ConfigurationManager.AppSettings["CollectionPartitionKey"] }
            };
            DocumentCollection collection = new DocumentCollection { Id = collectionName, PartitionKey = partitionKey };

            try
            {
                collection = await client.CreateDocumentCollectionAsync(
                    UriFactory.CreateDatabaseUri(databaseName),
                    collection,
                    new RequestOptions { OfferThroughput = collectionThroughput });
            }
            catch (Exception e)
            {
                throw e;
            }

            return collection;
        }

然后在 Main 方法中使用 BulkImport,如下所示:-

// 准备批量导入。

// 在此处使用简单的分区键创建文档。字符串 partitionKeyProperty = dataCollection.PartitionKey.Paths[0].Replace("/", "");

    long numberOfDocumentsToGenerate = long.Parse(ConfigurationManager.AppSettings["NumberOfDocumentsToImport"]);
    int numberOfBatches = int.Parse(ConfigurationManager.AppSettings["NumberOfBatches"]);
    long numberOfDocumentsPerBatch = (long)Math.Floor(((double)numberOfDocumentsToGenerate) / numberOfBatches);

    // Set retry options high for initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;

    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();

    // Set retries to 0 to pass control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;

    BulkImportResponse bulkImportResponse = null;
    long totalNumberOfDocumentsInserted = 0;
    double totalRequestUnitsConsumed = 0;
    double totalTimeTakenSec = 0;

    var tokenSource = new CancellationTokenSource();
    var token = tokenSource.Token;

var 任务 = 新列表();

        tasks.Add(Task.Run(async () =>
        {
            Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", i));
            do
            {
                try
                {
                    bulkImportResponse = await bulkExecutor.BulkImportAsync(
                        documents: documentsToImportInBatch,
                        enableUpsert: true,
                        disableAutomaticIdGeneration: true,
                        maxConcurrencyPerPartitionKeyRange: null,
                        maxInMemorySortingBatchSize: null,
                        cancellationToken: token);
                }
                catch (DocumentClientException de)
                {
                    Trace.TraceError("Document client exception: {0}", de);
                    break;
                }
                catch (Exception e)
                {
                    Trace.TraceError("Exception: {0}", e);
                    break;
                }
            } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);



            totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
            totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
            totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
        },
        token));

如果您需要更多详细信息,请告诉我。


推荐阅读