c# - 在同一个查询中使用 AsParallel() 和 AsSequential() 的影响?C#
问题描述
我正在阅读其中一本书中的 PLINQ,它说:
如果您有一个可以从并行处理中受益的复杂查询,但也有一些应该按顺序完成的部分,您可以使用 AsSequential 来阻止您的查询被并行处理。
例如:
var parallelResult = numbers.AsParallel().AsOrdered()
.Where(i => i % 2 == 0).AsSequential();
我想了解为什么允许这样做以及对结果有什么影响?它是并行运行的吗?它是按顺序运行的吗?现在没有任何意义。
解决方案
您可以将 LINQ 查询概念化为具有单个执行计划的原子构造,但将其概念化为由多个数据流块组成的管道可能更有帮助。每个块的输出成为数据流中下一个块的输入,一旦它们可用,这些块就会同时处理项目。以下一个查询为例,它由两个Select
运算符表示的两个“块”组成。第一个块被配置为一次处理 3 个项目(并行),而第二个块被配置为顺序处理每个项目。每个项目的处理持续时间对于并行块为 1000 毫秒,对于顺序块为 500 毫秒:
var results = Partitioner
.Create(Enumerable.Range(1, 10), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(3)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
+ $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
Thread.Sleep(1000); // Simulate some CPU-bound work
return x;
})
.AsSequential()
.Select(x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
+ $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
Thread.Sleep(500); // Simulate some CPU-bound work
return x;
})
.ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");
如果您运行此代码,您将获得如下输出:
08:32:17.628 [4] Parallel #2
08:32:17.628 [5] Parallel #1
08:32:17.628 [6] Parallel #3
08:32:18.642 [6] Parallel #5
08:32:18.642 [5] Parallel #4
08:32:18.644 [4] Parallel #6
08:32:18.651 [1] Sequential #1
08:32:19.644 [6] Parallel #7
08:32:19.645 [4] Parallel #8
08:32:19.646 [5] Parallel #9
08:32:19.654 [1] Sequential #2
08:32:20.156 [1] Sequential #3
08:32:20.648 [4] Parallel #10
08:32:20.658 [1] Sequential #4
08:32:21.161 [1] Sequential #5
08:32:21.663 [1] Sequential #6
08:32:22.164 [1] Sequential #7
08:32:22.672 [1] Sequential #8
08:32:23.173 [1] Sequential #9
08:32:23.675 [1] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
注意在所有并行处理完成之前顺序处理是如何开始的。为了达到这个效果,我使用了配置选项EnumerablePartitionerOptions.NoBuffering
和ParallelMergeOptions.NotBuffered
, 来防止第一个块缓冲它的输入和输出。
为了完整起见,让我们使用TPL 数据流库重写此查询。代码变得更加冗长和流畅,但执行的控制变得更加精确,并且异步工作流变得可用(PLINQ 对异步不友好):
var block1 = new TransformBlock<int, int>(async x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
+ $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
await Task.Delay(1000); // Simulate some I/O operation
return x;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 3,
EnsureOrdered = true // redundant since EnsureOrdered is the default
});
var block2 = new TransformBlock<int, int>(async x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
+ $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
await Task.Delay(500); // Simulate some I/O operation
return x;
}); // MaxDegreeOfParallelism = 1 is the default
block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
// Feeding the first block
foreach (var x in Enumerable.Range(1, 10))
{
await block1.SendAsync(x);
}
block1.Complete();
var results = new List<int>(); // Collecting the results is a bit painful
while (await block2.OutputAvailableAsync())
{
while (block2.TryReceive(out var result))
{
results.Add(result);
}
}
await block2.Completion;
Console.WriteLine($"Results: {String.Join(", ", results)}");
输出:
08:59:25.102 [6] Parallel #2
08:59:25.102 [4] Parallel #1
08:59:25.102 [7] Parallel #3
08:59:26.127 [7] Parallel #4
08:59:26.129 [6] Parallel #5
08:59:26.143 [4] Parallel #6
08:59:26.147 [5] Sequential #1
08:59:26.648 [5] Sequential #2
08:59:27.129 [6] Parallel #7
08:59:27.129 [7] Parallel #8
08:59:27.144 [4] Parallel #9
08:59:27.149 [5] Sequential #3
08:59:27.650 [5] Sequential #4
08:59:28.131 [6] Parallel #10
08:59:28.152 [5] Sequential #5
08:59:28.653 [5] Sequential #6
08:59:29.155 [5] Sequential #7
08:59:29.659 [5] Sequential #8
08:59:30.160 [5] Sequential #9
08:59:30.674 [5] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
推荐阅读
- google-cloud-pubsub - StreamingPull 请求具有非常高的延迟
- python - Maxcript :按名称搜索我的选择中的特定对象
- java - 如何将 ClientResponse 转换为 java 数组或 java 对象?
- ruby-on-rails - Ruby on Rails search_field_tag 类?
- mysql - MySQL query using CASE to SELECT multiple columns
- azure - 如何在 Azure 中托管的 Web 应用服务中安装 Microsoft Reportviewer exe?
- node.js - 以编程方式将文件从任何 CDN 传输到 Google Cloud 存储桶
- php - php时区错误
- javascript - 当我选择所有下拉元素时,ng-model 值没有更新
- vb.net - 创建带有变量 vb.net 的正式信函