首页 > 解决方案 > LINQ 恢复顺序后并行处理

问题描述

是否可以并行化 LINQ 处理链,但然后以原始顺序将结果拼接在一起?

所以对于这个管道:

var results = 
_fileReader.ReadFiles(inFolder)         
            .OrderBy(s => s.CreateDate).ThenBy(s => s.FileName)
            .Select(s => new ProcessedFile(s, isWriteSuccessful: ExecuteWrite(s, connectionString, logger)));
  1. 从文件夹中读取所有文件,
  2. 按时间顺序排列它们,
  3. 然后按顺序为每个文件

(a) 执行一些处理逻辑(例如提取消息)
(b) 将结果写入 db

注意:在这种特殊情况下,按时间顺序处理文件很重要,以便IDENTITY按时间顺序分配 SQL ID

但是如果指定并行处理,那么测试表明写入文件的顺序是不确定的:

_fileReader.ReadFiles(inFolder)
           .AsParallel()
           .OrderBy(s => s.CreateDate).ThenBy(s => s.FileName)
           .Select(s => new ProcessedFile(s, isWriteSuccessful: ExecuteWrite(s, connectionString, logger)));

但也许有一些方法可以强制缓存并行处理后的结果集,一旦所有文件处理完成,就按原始顺序将它缝合在一起(根据OrderBy)?

标签: linqparallel-processingtask-parallel-library

解决方案


您可以使用这样的东西来创建包含原始值及其原始序列的项目集合:

public class Sequenced<T>
{
    public int Sequence { get; }
    public T Value { get; }

    internal Sequenced(int sequence, T value)
    {
        Sequence = sequence;
        Value = value;
    }
}

public static class SequencedExtensions
{
    public static IEnumerable<Sequenced<T>> AsSequenced<T>(this IEnumerable<T> source)
    {
        var sequence = -1;
        foreach (var item in source)
        {
            yield return new Sequenced<T>(++sequence, item);
        }
    }
}

现在这将为您提供一个可以重新排序回其原始序列的集合:

var files = _fileReader.ReadFiles(inFolder)         
        .OrderBy(s => s.CreateDate).ThenBy(s => s.FileName)
        .AsSequenced();

完成后,按顺序排列项目Sequence,然后插入数据库。


推荐阅读