首页 > 解决方案 > 在同一个查询中使用 AsParallel() 和 AsSequential() 的影响?C#

问题描述

我正在阅读其中一本书中的 PLINQ,它说:

如果您有一个可以从并行处理中受益的复杂查询,但也有一些应该按顺序完成的部分,您可以使用 AsSequential 来阻止您的查询被并行处理。

例如:

var parallelResult = numbers.AsParallel().AsOrdered()
    .Where(i => i % 2 == 0).AsSequential();

我想了解为什么允许这样做以及对结果有什么影响?它是并行运行的吗?它是按顺序运行的吗?现在没有任何意义。

标签: c#.netvisual-studiolinqplinq

解决方案


您可以将 LINQ 查询概念化为具有单个执行计划的原子构造,但将其概念化为由多个数据流块组成的管道可能更有帮助。每个块的输出成为数据流中下一个块的输入,一旦它们可用,这些块就会同时处理项目。以下一个查询为例,它由两个Select运算符表示的两个“块”组成。第一个块被配置为一次处理 3 个项目(并行),而第二个块被配置为顺序处理每个项目。每个项目的处理持续时间对于并行块为 1000 毫秒,对于顺序块为 500 毫秒:

var results = Partitioner
    .Create(Enumerable.Range(1, 10), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(3)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
        Thread.Sleep(1000); // Simulate some CPU-bound work
        return x;
    })
    .AsSequential()
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
        Thread.Sleep(500); // Simulate some CPU-bound work
        return x;
    })
    .ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

如果您运行此代码,您将获得如下输出:

08:32:17.628 [4] Parallel #2
08:32:17.628 [5] Parallel #1
08:32:17.628 [6] Parallel #3
08:32:18.642 [6] Parallel #5
08:32:18.642 [5] Parallel #4
08:32:18.644 [4] Parallel #6
08:32:18.651 [1] Sequential #1
08:32:19.644 [6] Parallel #7
08:32:19.645 [4] Parallel #8
08:32:19.646 [5] Parallel #9
08:32:19.654 [1] Sequential #2
08:32:20.156 [1] Sequential #3
08:32:20.648 [4] Parallel #10
08:32:20.658 [1] Sequential #4
08:32:21.161 [1] Sequential #5
08:32:21.663 [1] Sequential #6
08:32:22.164 [1] Sequential #7
08:32:22.672 [1] Sequential #8
08:32:23.173 [1] Sequential #9
08:32:23.675 [1] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

注意在所有并行处理完成之前顺序处理是如何开始的。为了达到这个效果,我使用了配置选项EnumerablePartitionerOptions.NoBufferingParallelMergeOptions.NotBuffered, 来防止第一个块缓冲它的输入和输出。

为了完整起见,让我们使用TPL 数据流库重写此查询。代码变得更加冗长和流畅,但执行的控制变得更加精确,并且异步工作流变得可用(PLINQ 对异步不友好):

var block1 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
    await Task.Delay(1000); // Simulate some I/O operation
    return x;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 3,
    EnsureOrdered = true // redundant since EnsureOrdered is the default
});

var block2 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
    await Task.Delay(500); // Simulate some I/O operation
    return x;
}); // MaxDegreeOfParallelism = 1 is the default

block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });

// Feeding the first block
foreach (var x in Enumerable.Range(1, 10))
{
    await block1.SendAsync(x);
}
block1.Complete();

var results = new List<int>(); // Collecting the results is a bit painful
while (await block2.OutputAvailableAsync())
{
    while (block2.TryReceive(out var result))
    {
        results.Add(result);
    }
}
await block2.Completion;
Console.WriteLine($"Results: {String.Join(", ", results)}");

输出:

08:59:25.102 [6] Parallel #2
08:59:25.102 [4] Parallel #1
08:59:25.102 [7] Parallel #3
08:59:26.127 [7] Parallel #4
08:59:26.129 [6] Parallel #5
08:59:26.143 [4] Parallel #6
08:59:26.147 [5] Sequential #1
08:59:26.648 [5] Sequential #2
08:59:27.129 [6] Parallel #7
08:59:27.129 [7] Parallel #8
08:59:27.144 [4] Parallel #9
08:59:27.149 [5] Sequential #3
08:59:27.650 [5] Sequential #4
08:59:28.131 [6] Parallel #10
08:59:28.152 [5] Sequential #5
08:59:28.653 [5] Sequential #6
08:59:29.155 [5] Sequential #7
08:59:29.659 [5] Sequential #8
08:59:30.160 [5] Sequential #9
08:59:30.674 [5] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

推荐阅读