c# - 通过使同步方法异步进行比线程更多的远程调用
问题描述
我有一堆都是同步的远程调用(第三方库)。它们中的大多数都需要很长时间,所以我不能更频繁地使用它们,然后每秒大约 5 到 10 次。这太慢了,因为我需要每几分钟至少调用 3000 次,如果服务停止一段时间,我需要调用更多次。客户端上几乎没有 CPU 工作。它获取数据,检查一些简单的条件并发出另一个它必须等待的调用。
使它们异步的最佳方法是什么(以异步方式调用它们-我想我需要一些异步包装器)以便我可以同时发出更多请求?目前它受到线程数(四个)的限制。
我正在考虑用它们来调用它们,Task.Run
但我读到的每一篇文章都说它是针对 CPU 密集型工作的,并且它使用线程池线程。如果我理解正确,用这种方法我将无法打破线程限制,对吗?那么哪种方法实际上最适合这里?
怎么样Task.FromResult
?我可以以比线程更多的数量异步等待此类方法吗?
public async Task<Data> GetDataTakingLotsOfTime(object id)
{
var data = remoting.GetData(id);
return await Task.FromResult(data);
}
解决方案
您正在执行远程调用,并且您的线程需要空闲等待远程调用的结果。在此等待期间,您的线程可以做一些有用的事情,例如执行其他远程调用。
当您的线程空闲等待其他进程完成时,例如写入磁盘、查询数据库或从 Internet 获取信息时,通常会在非异步函数旁边看到异步函数:Write
and WriteAsync
, Send
and SendAsync
.
如果在同步调用的最深层,您可以访问调用的异步版本,那么您的生活会很轻松。唉,您似乎没有这样的异步版本。
您提出的解决方案使用Task.Run
的缺点是启动新线程(或从线程池运行一个线程)的开销。
您可以通过创建车间对象来降低此开销。在车间,一个专用线程(一个工人),或几个专用线程在一个输入点等待一个命令来做某事。线程执行任务并将结果发布到输出点。
研讨会的用户有一个访问点(前台?),他们可以在其中发布请求以做某事,并等待结果。
为此,我使用了System.Threading.Tasks.Dataflow.BufferBlock。安装 Nuget 包 TPL 数据流。
您可以将您的工作室专门用于只接受工作GetDataTakingLotsOfTime
;我让我的工作室变得通用:我接受所有实现接口 IWork 的工作:
interface IWork
{
void DoWork();
}
车间有两个BufferBlocks
:一个输入工作请求,一个输出完成的工作。车间有一个线程(或多个线程)在输入处等待,BufferBlock
直到作业到达。是否Work
, 并在完成时将作业发布到输出BufferBlock
class WorkShop
{
public WorkShop()
{
this.workRequests = new BufferBlock<IWork>();
this.finishedWork = new BufferBlock<IWork>();
this.frontOffice = new FrontOffice(this.workRequests, this.finishedWork);
}
private readonly BufferBlock<IWork> workRequests;
private readonly BufferBlock<IWork> finishedWork;
private readonly FrontOffice frontOffice;
public FrontOffice {get{return this.frontOffice;} }
public async Task StartWorkingAsync(CancellationToken token)
{
while (await this.workRequests.OutputAvailableAsync(token)
{ // some work request at the input buffer
IWork requestedWork = this.workRequests.ReceiveAsync(token);
requestedWork.DoWork();
this.FinishedWork.Post(requestedWork);
}
// if here: no work expected anymore:
this.FinishedWork.Complete();
}
// function to close the WorkShop
public async Task CloseShopAsync()
{
// signal that no more work is to be expected:
this.WorkRequests.Complete();
// await until the worker has finished his last job for the day:
await this.FinishedWork.Completion();
}
}
TODO:对 CancellationToken.CancellationRequested
的正确反应 TODO:对工作抛出的异常的正确反应
TODO:决定是否使用多个线程来完成工作
FrontOffice 有一个异步函数,它接受工作,将工作发送到 WorkRequests 并等待工作完成:
public async Task<IWork> OrderWorkAsync(IWork work, CancellationToken token)
{
await this.WorkRequests.SendAsync(work, token);
IWork finishedWork = await this.FinishedWork.ReceivedAsync(token);
return finishedWork;
}
因此,您的进程创建了一个 WorkShop 对象并启动一个或多个将 StartWorking 的线程。
每当任何线程(包括您的主线程)需要以异步等待方式执行一些工作时:
- 创建一个保存输入参数和 DoWork 函数的对象
- 询问 FrontOffice 的研讨会
- 等待 OrderWorkAsync
.
class InformationGetter : IWork
{
public int Id {get; set;} // the input Id
public Data FetchedData {get; private set;} // the result from Remoting.GetData(id);
public void DoWork()
{
this.FetchedData = remoting.GetData(this.Id);
}
}
最后是你的遥控器的异步版本
async Task<Data> RemoteGetDataAsync(int id)
{
// create the job to get the information:
InformationGetter infoGetter = new InformationGetter() {Id = id};
// go to the front office of the workshop and order to do the job
await this.MyWorkShop.FrontOffice.OrderWorkAsync(infoGetter);
return infoGetter.FetchedData;
}