首页 > 解决方案 > C# Parallel.Foreach...具有未知线程数的多线程

问题描述

我有一个必须在我的每个业务上运行的同步过程。企业的数量是不断变化的。

我已经阅读了有关 Thread 类、Parallelism..etc 的文档......我不确定我是否理解如何在不知道/命名预定义线程数的情况下做到这一点......在这种情况下,这个数字是未知的。出于这个原因,我找到了 Parallel.ForEach ......因为我希望同时运行未知数量的操作

我的同步操作每 10 分钟运行一次。它们每个都需要一两分钟才能运行。显然,我不能迭代地运行它们,因为当它们完成时,下一个调用将被触发。

我想在单独的线程中同时运行它们。虽然他们每个人都应该有唯一的 API 密钥,但他们不共享内存或数据,也不会修改任何共享数据。

为此,我对如何进行多线程进行了一些研究......我认为 Parallel.ForEach 会成功......

我需要语法方面的帮助...

这是在 Worker 服务中...我有一个私有方法SyncBusiness(int businessId),它调用一个 API 端点来同步业务。简单..只需要调用方法的帮助吗?

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var businessIds = (from x in _db.Poslookup
                       select x.BusinessId).Distinct();

    while (!stoppingToken.IsCancellationRequested)
    {
        // Want to multi-thread a sync for each of the businesses in businessIds
        Parallel.ForEach(businessIds, i => { 
            await SyncBusiness(i)
        });

        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
        await Task.Delay(600000, stoppingToken);
    }
}

此外,请评论有关可扩展性、线程限制等方面的任何问题。如果我发展到数千家企业进行同步,我可能会遇到麻烦的任何领域...也许是关于阅读有关同步操作和可扩展性?

太感谢了。干杯。

标签: c#multithreadingparallel.foreach

解决方案


正如其他人所指出的,您不能使用asyncwith Parallel.ForEach。但是,您可以通过SyncBusiness一次启动所有调用然后使用来使异步代码并发Task.WhenAll

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  var businessIds = (from x in _db.Poslookup
                     select x.BusinessId).Distinct();

  while (!stoppingToken.IsCancellationRequested)
  {
    var tasks = businessIds.Select(SyncBusiness).ToList();
    await Task.WhenAll(tasks);

    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    await Task.Delay(600000, stoppingToken);
  }
}

我还建议使您的数据库查找异步:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  while (!stoppingToken.IsCancellationRequested)
  {
    var businessIds = await (from x in _db.Poslookup
                       select x.BusinessId).Distinct().ToListAsync();

    var tasks = businessIds.Select(SyncBusiness).ToList();
    await Task.WhenAll(tasks);

    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    await Task.Delay(600000, stoppingToken);
  }
}

最后的观察是,这段代码当前同步所有业务,然后在其工作之间等待十分钟。如果您希望它每 10 分钟开始运行一次,那么您可以在方法开始时启动计时器:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  while (!stoppingToken.IsCancellationRequested)
  {
    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    var timerTask = Task.Delay(TimeSpan.FromMinutes(10), stoppingToken);
    var businessIds = await (from x in _db.Poslookup
                       select x.BusinessId).Distinct().ToListAsync();

    var tasks = businessIds.Select(SyncBusiness).ToList();
    tasks.Add(timerTask);
    await Task.WhenAll(tasks);
  }
}

推荐阅读