首页 > 解决方案 > 针对 Azure 表存储的并行 StartsWith 查询

问题描述

我有以下方法在 Azure 表存储上的表中执行startsWith查询。RowKey我现在想使用startsWithon运行并行查询RowKey

是否可以创建一个简单地调用我现有方法的并行方法,或者我是否必须创建现有方法的并行版本?

这是我目前的startWith方法:

public async Task<IEnumerable<T>> RowKeyStartsWith<T>
                            (string searchString,
                            string tableName,
                            string partitionKey,
                            string columnName = "RowKey") where T : ITableEntity, new()
{
    // Make sure we have a search string
    if (string.IsNullOrEmpty(searchString)) return null;

    // Get CloudTable
    var table = GetTable(tableName);

    char lastChar = searchString[searchString.Length - 1];
    char nextLastChar = (char)((int)lastChar + 1);
    string nextSearchStr = searchString.Substring(0, searchString.Length - 1) + nextLastChar;

    // Define query segment(s)
    string prefixCondition = TableQuery.CombineFilters(
         TableQuery.GenerateFilterCondition(columnName, QueryComparisons.GreaterThanOrEqual, searchString),
              TableOperators.And,
              TableQuery.GenerateFilterCondition(columnName, QueryComparisons.LessThan, nextSearchStr)
         );

    string filterString = TableQuery.CombineFilters(
            TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, partitionKey),
               TableOperators.And,
               prefixCondition
        );

    // Create final query
    var query = new TableQuery<T>().Where(filterString);

    // Declare result variable
    var result = new List<T>();

    // Execute query asynchronously
    TableContinuationToken continuationToken = null;
    do
    {
        Task<TableQuerySegment<T>> querySegment = table.ExecuteQuerySegmentedAsync(query, continuationToken);
        TableQuerySegment<T> segment = await querySegment;
        result.AddRange(segment.ToList());
        continuationToken = segment.ContinuationToken;
    } while (continuationToken != null);

    return result;
}

标签: azureazure-storageazure-table-storage

解决方案


是否可以创建一个简单地调用我现有方法的并行方法,或者我是否必须创建现有方法的并行版本?

根据我的理解,您可以重用现有方法并使用多个任务执行查询,如下所示:

//for storing the query results
ConcurrentDictionary<string, object> resultDics = new ConcurrentDictionary<string, object>();

//simulate your seaching parameters
List<RowKeyStartsWithParamModel> rowKeySearchs = Enumerable.Range(1, 10)
    .Select(i => new RowKeyStartsWithParamModel()
    {
        SearchString = i.ToString(),
        TableName = "tablename",
        ColumnName = "Rowkey",
        ParationKey = "partionKey"
    }).ToList();

//create multiple tasks to execute your jobs
var tasks = rowKeySearchs.Select(item => Task.Run(async () =>
{   
    //invoke your existing RowKeyStartsWith
    var results=await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
    //add retrieved results
    resultDics.TryAdd(item.SearchString, results);
}));

//synchronously wait all tasks to be executed completely.
Task.WaitAll(tasks.ToArray());

//print all retrieved results
foreach (var item in resultDics)
{
    Console.WriteLine($"{item.Key},{JsonConvert.SerializeObject(item.Value)}");
}

此外,您可以按如下方式利用Parallel :

Parallel.ForEach(rowKeySearchs, async(item) =>
{
    var results = await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
    resultDics.TryAdd(item.SearchString, results);
});

注意:由于await每次迭代都在委托中使用,因此无法在Parallel.ForEach.

为了使用上述代码片段同步检索结果,您可以利用以下方法:

RowKeyStartsWith1)每次迭代调用时同步检索结果Parallel.ForEach如下:

var results = RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName).Result;

2)您可以利用WaitHandle同步等待查询结果,直到所有 WaitHandle 完成。

var waitHandles = rowKeySearchs.Select(d => new EventWaitHandle(false, EventResetMode.ManualReset)).ToArray();
Parallel.ForEach(rowKeySearchs, async (item,loopState,index) =>
{
    var results = await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
    resultDics.TryAdd(item.SearchString, results);
    waitHandles[index].Set(); //release
});
WaitHandle.WaitAll(waitHandles); //block the current thread until all EventWaitHandles released

推荐阅读