首页 > 解决方案 > 带有等待的每个循环的任务工厂

问题描述

我是任务新手,对使用有疑问。Task.Factory 是否为 foreach 循环中的所有项目触发或在“等待”处阻塞基本上使程序成为单线程?如果我正确地考虑了这一点,foreach 循环将启动所有任务和 .GetAwaiter().GetResult(); 阻塞主线程,直到最后一个任务完成。

另外,我只是想要一些匿名任务来加载数据。这会是一个正确的实现吗?我不是指异常处理,因为这只是一个示例。

为了清楚起见,我从外部 API 将数据加载到数据库中。这个是使用 FRED 数据库的。(https://fred.stlouisfed.org/),但我有几个我将完成整个传输(可能是 200k 数据点)。完成后,我会更新表格、刷新市场计算等。其中一些是实时的,一些是日终的。我还想说,我目前在 docker 中的所有东西都在工作,但一直在努力使用任务来更新代码以改进执行。

class Program
{
    private async Task SQLBulkLoader() 
    {

        foreach (var fileListObj in indicators.file_list)
        {
            await Task.Factory.StartNew(  () =>
            {

                string json = this.GET(//API call);

                SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);

                DataTable dataTableConversion = ConvertToDataTable(obj.observations);
                dataTableConversion.TableName = fileListObj.series_id;

                using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
                {
                    dbConnection.Open();
                    using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
                    {
                      s.DestinationTableName = dataTableConversion.TableName;
                      foreach (var column in dataTableConversion.Columns)
                          s.ColumnMappings.Add(column.ToString(), column.ToString());
                      s.WriteToServer(dataTableConversion);
                    }

                  Console.WriteLine("File: {0} Complete", fileListObj.series_id);
                }
             });
        }            
    }

    static void Main(string[] args)
    {
        Program worker = new Program();
        worker.SQLBulkLoader().GetAwaiter().GetResult();
    }
}

标签: c#asynchronoustasktask-parallel-library

解决方案


您正在等待从返回的任务Task.Factory.StartNew确实使它成为有效的单线程。您可以通过这个简短的 LinqPad 示例看到一个简单的演示:

for (var i = 0; i < 3; i++)
{
    var index = i;
    $"{index} inline".Dump();
    await Task.Run(() =>
    {
        Thread.Sleep((3 - index) * 1000);
        $"{index} in thread".Dump();
    });
}

在这里,随着循环的进行,我们等待的时间更少。输出是:

0 内联
0 在线程中
1 内联
1 在线程中
2 内联
2 在线程中

如果您删除await前面的,StartNew您会看到它并行运行。正如其他人所提到的,您当然可以使用Parallel.ForEach,但是为了演示更多手动操作,您可以考虑这样的解决方案:

var tasks = new List<Task>();

for (var i = 0; i < 3; i++) 
{
    var index = i;
    $"{index} inline".Dump();
    tasks.Add(Task.Factory.StartNew(() =>
    {
        Thread.Sleep((3 - index) * 1000);
        $"{index} in thread".Dump();
    }));
}

Task.WaitAll(tasks.ToArray());

现在注意结果如何:

0 内联
1 内联
2 内联
2 在线程中
1 在线程中
0 在线程中


推荐阅读