c# - 如何停止传播异步流 (IAsyncEnumerable)
问题描述
我有一个接受IAsyncEnumerable
作为参数的方法,并且还返回一个IAsyncEnumerable
. 它为输入流中的每个项目调用一个 Web 方法,并将结果传播到输出流。我的问题是,如果我的方法的调用者已停止枚举输出流,如何通知我,以便我可以停止枚举我的方法中的输入流?似乎我应该能够收到通知,因为调用者默认处理IAsyncEnumerator
从我的方法中获取的内容。是否有任何内置机制为编译器生成的异步方法生成这样的通知?如果不是,最容易实施的替代方案是什么?
例子。Web 方法验证 url 是否有效。提供了一个永无止境的 url 流,但是当发现超过 2 个无效 url 时调用者停止枚举结果:
var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
Console.WriteLine($"Url {result.Url} is "
+ (result.IsValid ? "OK" : "Invalid!"));
if (!result.IsValid) invalidCount++;
if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);
网址的生成器。每 300 毫秒生成一个 url。
private static async IAsyncEnumerable<string> GetMockUrls()
{
int index = 0;
while (true)
{
await Task.Delay(300);
yield return $"https://mock.com/{++index:0000}";
}
}
url 的验证器。需要急切地枚举输入流,因此两个异步工作流并行运行。第一个工作流程将 url 插入队列中,第二个工作流程逐个挑选 url 并验证它们。ABufferBlock
用作异步队列。
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
_ = Task.Run(async () =>
{
await foreach (var url in urls)
{
Console.WriteLine($"Url {url} received");
await buffer.SendAsync(url);
}
buffer.Complete();
});
while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
{
yield return (url, await MockValidateUrl(url));
}
}
澄清:队列是强制性的,删除它不是一种选择。它是这个问题的一个重要组成部分。
单个 url 的验证器。验证过程平均持续 300 毫秒。
private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
await Task.Delay(_random.Next(100, 600));
return _random.Next(0, 2) != 0;
}
输出:
Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...
问题是在调用者/客户端完成异步枚举后仍然会生成和接收 url。我想解决这个问题,以便在--Async enumeration finished--
.
解决方案
编辑
使用适当的示例,讨论会更容易。验证 URL 并不昂贵。如果您需要点击例如 100 个 URL 并选择前 3 个响应怎么办?
在这种情况下,工作人员和缓冲区都有意义。
编辑 2
其中一条评论增加了额外的复杂性——任务是同时执行的,结果需要在它们到达时发出。
对于初学者,ValidateUrl
可以重写为迭代器方法:
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
await foreach (var url in urls)
{
Console.WriteLine($"Url {url} received");
var isValid=await MockValidateUrl(url);
yield return (url, isValid);
}
}
不需要工作任务,因为所有方法都是异步的。除非消费者要求结果,否则迭代器方法不会继续。即使MockValidateUrl
做一些昂贵的事情,它也可以使用 aTask.Run
本身或包裹在Task.Run
. 不过,这会产生相当多的任务。
为了完整起见,您可以添加一个CancellationToken
and ConfigureAwait(false)
:
public static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
IAsyncEnumerable<string> urls,
[EnumeratorCancellation]CancellationToken token=default)
{
await foreach(var url in urls.WithCancellation(token).ConfigureAwait(false))
{
var isValid=await MockValidateUrl(url).ConfigureAwait(false);
yield return (url,isValid);
}
}
无论如何,只要调用者停止迭代,ValidateUrls
就会停止。
缓冲
缓冲是一个问题——无论它是如何编程的,工作人员在缓冲区填满之前都不会停止。缓冲区的大小是工作人员在意识到需要停止之前将进行多少次迭代。这对于 Channel 来说是一个很好的案例(是的,再次!):
public static IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
IAsyncEnumerable<string> urls,CancellationToken token=default)
{
var channel=Channel.CreateBounded<(string Url, bool IsValid)>(2);
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var url in urls.WithCancellation(token))
{
var isValid=await MockValidateUrl(url);
await writer.WriteAsync((url,isValid));
}
},token)
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader.ReadAllAsync(token);
}
不过,最好传递 ChannelReaders 而不是 IAsyncEnumerables。至少,在有人尝试从 ChannelReader 读取之前,不会构造异步枚举器。将管道构建为扩展方法也更容易:
public static ChannelReader<(string Url, bool IsValid)> ValidateUrls(
this ChannelReader<string> urls,int capacity,CancellationToken token=default)
{
var channel=Channel.CreateBounded<(string Url, bool IsValid)>(capacity);
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var url in urls.ReadAllAsync(token))
{
var isValid=await MockValidateUrl(url);
await writer.WriteAsync((url,isValid));
}
},token)
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader;
}
这种语法允许以流畅的方式构建管道。假设我们有这个辅助方法可以将 IEnumerables 转换为通道(或 IAsyncEnumerables):
public static ChannelReader<T> AsChannel(
IEnumerable<T> items)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
foreach(var item in items)
{
channel.TryWrite(item);
}
return channel.Reader;
}
我们可以写:
var pipeline=urlList.AsChannel() //takes a list and writes it to a channel
.ValidateUrls();
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
//Use the items here
}
立即传播的并发调用
使用通道很容易,尽管此时工作人员需要立即触发所有任务。本质上,我们需要多个工人。这不是仅使用 IAsyncEnumerable 就可以完成的。
首先,如果我们想使用例如 5 个并发任务来处理我们可以编写的输入
var tasks = Enumerable.Range(0,5).
.Select(_ => Task.Run(async ()=>{
///
},token));
_ = Task.WhenAll(tasks)(t=>writer.Complete(t.Exception));
代替 :
_ = Task.Run(async ()=>{
///
},token)
.ContinueWith(t=>writer.Complete(t.Exception));
使用大量工人就足够了。我不确定 IAsyncEnumerable 是否可以被多个工作人员使用,我也不想知道。
提前取消
如果客户端消耗所有结果,则上述所有工作。但是,要在前 5 个结果之后停止处理,我们需要 CancellationToken :
var cts=new CancellationTokenSource();
var pipeline=urlList.AsChannel() //takes a list and writes it to a channel
.ValidateUrls(cts.Token);
int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
//Break after 3 iterations
if(i++>2)
{
break;
}
....
}
cts.Cancel();
此代码本身可以在接收 ChannelReader 的方法中提取,在本例中为 CancellationTokenSource :
static async LastStep(this ChannelReader<(string Url, bool IsValid)> input,CancellationTokenSource cts)
{
int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
//Break after 3 iterations
if(i++>2)
{
break;
}
....
}
cts.Cancel();
}
管道变为:
var cts=new CancellationTokenSource();
var pipeline=urlList.AsChannel()
.ValidateUrls(cts.Token)
.LastStep(cts);
推荐阅读
- python - 关键字 args 到字典
- material-ui - React-Admin CSS 主题覆盖较早放置并被 MaterialUI 覆盖
- c# - 计算下一个字符串的函数
- java - 无法构建 Hibernate SessionFactory - 无法实例化测试对象
- java - 如何指定对外部应用服务器进行基准测试的并发级别?
- vim - 如何在 vimspector 中调试 java 外部类?
- java - 如何使用捆绑 Android Studio 将图像传输到其他活动
- html - 电子 wix 安装程序图标
- reactjs - 使用 react redux 从不同的 axios api 获取多个数据
- pandas - 比较 pandas 数据框中的所有列以对不同的场景进行分类