首页 > 解决方案 > 通过使同步方法异步进行比线程更多的远程调用

问题描述

我有一堆都是同步的远程调用(第三方库)。它们中的大多数都需要很长时间,所以我不能更频繁地使用它们,然后每秒大约 5 到 10 次。这太慢了,因为我需要每几分钟至少调用 3000 次,如果服务停止一段时间,我需要调用更多次。客户端上几乎没有 CPU 工作。它获取数据,检查一些简单的条件并发出另一个它必须等待的调用。

使它们异步的最佳方法是什么(以异步方式调用它们-我想我需要一些异步包装器)以便我可以同时发出更多请求?目前它受到线程数(四个)的限制。

我正在考虑用它们来调用它们,Task.Run但我读到的每一篇文章都说它是针对 CPU 密集型工作的,并且它使用线程池线程。如果我理解正确,用这种方法我将无法打破线程限制,对吗?那么哪种方法实际上最适合这里?

怎么样Task.FromResult?我可以以比线程更多的数量异步等待此类方法吗?

public async Task<Data> GetDataTakingLotsOfTime(object id)
{
    var data = remoting.GetData(id);
    return await Task.FromResult(data);
}

标签: c#asynchronousasync-awaitremoting

解决方案


您正在执行远程调用,并且您的线程需要空闲等待远程调用的结果。在此等待期间,您的线程可以做一些有用的事情,例如执行其他远程调用。

当您的线程空闲等待其他进程完成时,例如写入磁盘、查询数据库或从 Internet 获取信息时,通常会在非异步函数旁边看到异步函数:Writeand WriteAsync, Sendand SendAsync.

如果在同步调用的最深层,您可以访问调用的异步版本,那么您的生活会很轻松。唉,您似乎没有这样的异步版本。

您提出的解决方案使用Task.Run的缺点是启动新线程(或从线程池运行一个线程)的开销。

您可以通过创建车间对象来降低此开销。在车间,一个专用线程(一个工人),或几个专用线程在一个输入点等待一个命令来做某事。线程执行任务并将结果发布到输出点。

研讨会的用户有一个访问点(前台?),他们可以在其中发布请求以做某事,并等待结果。

为此,我使用了System.Threading.Tasks.Dataflow.BufferBlock。安装 Nuget 包 TPL 数据流。

您可以将您的工作室专门用于只接受工作GetDataTakingLotsOfTime;我让我的工作室变得通用:我接受所有实现接口 IWork 的工作:

interface IWork
{
    void DoWork();
}

车间有两个BufferBlocks:一个输入工作请求,一个输出完成的工作。车间有一个线程(或多个线程)在输入处等待,BufferBlock直到作业到达。是否Work, 并在完成时将作业发布到输出BufferBlock

class WorkShop
{
    public WorkShop()
    {
         this.workRequests = new BufferBlock<IWork>();
         this.finishedWork = new BufferBlock<IWork>();
         this.frontOffice = new FrontOffice(this.workRequests, this.finishedWork);
    }

    private readonly BufferBlock<IWork> workRequests;
    private readonly BufferBlock<IWork> finishedWork;
    private readonly FrontOffice frontOffice;

    public FrontOffice {get{return this.frontOffice;} }

    public async Task StartWorkingAsync(CancellationToken token)
    {
        while (await this.workRequests.OutputAvailableAsync(token)
        {   // some work request at the input buffer
            IWork requestedWork = this.workRequests.ReceiveAsync(token);
            requestedWork.DoWork();
            this.FinishedWork.Post(requestedWork);
        }
        // if here: no work expected anymore:
        this.FinishedWork.Complete();
    }

    // function to close the WorkShop
    public async Task CloseShopAsync()
    {
         // signal that no more work is to be expected:
         this.WorkRequests.Complete();
         // await until the worker has finished his last job for the day:
         await this.FinishedWork.Completion();
    }
}

TODO:对 CancellationToken.CancellationRequested
的正确反应 TODO:对工作抛出的异常的正确反应
TODO:决定是否使用多个线程来完成工作

FrontOffice 有一个异步函数,它接受工作,将工作发送到 WorkRequests 并等待工作完成:

public async Task<IWork> OrderWorkAsync(IWork work, CancellationToken token)
{
    await this.WorkRequests.SendAsync(work, token);
    IWork finishedWork = await this.FinishedWork.ReceivedAsync(token);
    return finishedWork;
}

因此,您的进程创建了一个 WorkShop 对象并启动一个或多个将 StartWorking 的线程。

每当任何线程(包括您的主线程)需要以异步等待方式执行一些工作时:

  • 创建一个保存输入参数和 DoWork 函数的对象
  • 询问 FrontOffice 的研讨会
  • 等待 OrderWorkAsync

.

class InformationGetter : IWork
{
     public int Id {get; set;}                     // the input Id
     public Data FetchedData {get; private set;}   // the result from Remoting.GetData(id);
     public void DoWork()
     {
         this.FetchedData = remoting.GetData(this.Id);
     }
}

最后是你的遥控器的异步版本

async Task<Data> RemoteGetDataAsync(int id)
{
     // create the job to get the information:
     InformationGetter infoGetter = new InformationGetter() {Id = id};

     // go to the front office of the workshop and order to do the job
     await this.MyWorkShop.FrontOffice.OrderWorkAsync(infoGetter);
     return infoGetter.FetchedData;
}

推荐阅读