首页 > 解决方案 > 文件异步到达的相关数据处理管道

问题描述

我有几个依赖于数据的任务/管道,其中一些依赖于另一个的完成。更难的是数据可以异步到达,这意味着某些任务需要等到上一步中的所有文件或任务都处理完。

这是一个例子:

假设我们有一个x[i,j]带有索引的原始文件,其中i代表主类别中的一个特定子类别j

我需要运行以下管道:

  1. 管道1:清理原始文件x[i,j]并将其存储为x_clean[i,j]
  2. 管道2:一旦管道 1 为所有i内部完成j,汇总结果x_clean[i,j]并将其存储为y_clean[j]
  3. 管道3:清理原始文件z[j]并将其存储为z_clean[j]
  4. 管道4:一旦管道 2 和管道 3 完成,将其合并z_clean[j]y_clean[j]存储为w_clean[j].

我可以应用什么样的模型来处理这种数据流方法?这种数据处理任务背后有什么方法吗?GCP 是否为此类问题构建了一些东西?

标签: architectureworkflowpipelinedataflowdata-processing

解决方案


在生产过程中...

  • 步骤取决于其他步骤的完成情况。

  • 材料可以异步到达,这意味着后续步骤将等待产品到达进行处理。但是,请注意,这并不意味着无限制的材料会失控,只有特定制造订单要消耗的材料。如果您的场景允许无限数据流涌入,那么您必须对其进行预处理,以避免混合不同的产品组件。不要破坏流程的结构来尝试处理某个缓冲区或其他任何东西中的异步到达数据,因为制造数据产品涉及关系数据而不是原材料。

  • 子组件可以在加入分支中完成,这意味着组装步骤在组装开始之前等待相关组件的协调集到达。

我是 POWER 的创建者,这是迄今为止唯一的协作(制造)架构。关于这个主题有很多东西要学,但是你可以在网上找到我的文章和代码: http ://www.powersemantics.com/

以下是您的流程在制造工作模型中的样子:

    class MyProduct
    {
        public object[i,j] x_clean { get; set; }
        public object[j] y_clean { get; set; }
        public object[j] z_clean { get; set; }
        // final product
        public object[j] w_clean { get; set; }
    }
    class MyProcess : Producer<MyProduct>, IProcess, IMachine, IOrganize
    {
        // process inputs
        public object[i,j] x { get; set; }  // raw file
        public object[j] z { get; set; } // raw file

        // machines
        public CleanerA Cleaner1 { get; set; }
        public Aggregator Aggregator1 { get; set }
        public CleanerB Cleaner2 { get; set; }
        public Assembler Assembler1 { get; set; }

        public void D() { // instantiates properties and machines }
        public void O()
        {
            // bind machines to work on the same data points
            // allows maintenance to later remove cleaners if it becomes possible
            // for the process to receive data in the correct form
            Cleaner1.x = x;
            Cleaner1.Product.x_clean = Product.x_clean;

            Aggregator1.x_clean = Product.x_clean;
            Aggregator1.Product.y_clean = Product.y_clean;

            Cleaner2.z = z;
            Cleaner2.Product.z_clean = Product.z_clean;

            Assembler1.z_clean = Product.z_clean;
            Assembler1.y_clean = Product.y_clean;
            Assembler1.Product.w_clean = Product.w_clean;
        }

        // hardcoded synchronous controller
        public void M()
        {
            Cleaner1.M();
            Aggregator1.M();
            Cleaner2.M();
            Assembler1.M();
        }
    }

    // these class pairs are Custom Machines, very specific work organized
    // by user requirements rather than in terms of domain-specific operations
    class CleanerAProduct
    {
        public object[i,j] x_clean { get; set; }
    }
    class CleanerA: Producer<CleanerAProduct>, IMachine
    {
        public object[i,j] x { get; set; }  // raw file
        public void M()
        {
            // clean the raw file x[i,j] and store it as x_clean[i,j]
        }
    }


    class AggregatorProduct
    {
        public object[j] y_clean { get; set; }
    }
    class Aggregator: Producer<AggregatorProduct>, IMachine
    {
        public object[i,j] x_clean { get; set; }
        public void M()
        {
            // aggregate the results from x_clean[i,j] and store it as y_clean[j]
        }
    }


    class CleanerBProduct
    {
        public object[j] z_clean { get; set; }
    }
    class CleanerB : Producer<CleanerBProduct>, IMachine
    {
        public object[j] z { get; set; }
        public void M()
        {
            // clean a raw file z[j] and store it as z_clean[j]
        }
    }


    class AssemblerProduct
    {
        public object[j] w_clean { get; set; }
    }
    class Assembler : Producer<AssemblerProduct>, IMachine
    {
        public object[j] y_clean { get; set; }
        public object[j] z_clean { get; set; }
        public void M()
        {
            // combine z_clean[j] and y_clean[j] and store it as w_clean[j]
        }
    }

生产过程类的正常使用:

  1. 实例化。调用 D() 来实例化机器和产品。
  2. 将任何输入分配给流程。
  3. 调用 O() 让进程将这些输入分配给机器,并绑定机器以在最终产品上运行。这是您在生产前覆盖这些分配的最后机会。
  4. 调用 M() 来执行该过程。

大多数源代码在同一个函数体中将生产者和消费者焊接在一起,因此以后维护变得很痛苦,然后函数将数据通过电子邮件发送给彼此,就像没有电子邮件跟踪的无用办公室工作人员一样。当您稍后想要做出垂直集成决策(例如更换机器或扩展流程)时,这会导致问题,所有这些我都用源记录。POWER 是唯一避免集中化等复杂性的架构。我在二月份发布了它。

有 ETL 工具和其他解决方案,如 TPL Dataflow,但生产流程不会为程序员组织或管理自己。所有程序员都需要学习 POWER 以正确处理浪费、集成、控制和仪表的职责。当我们编写自动化代码然后无法立即停止实时执行时,雇主看着我们很有趣,但我们的教育只是让我们准备创建流程,而不是像制造那样构建流程。


推荐阅读