首页 > 解决方案 > Creating savepoints and starting from a specific node in apache beam

问题描述

Creating savepoints

Currently savepoints are created by saving the pipeline to a csv file after each transformation. The setup in general looks like this:

savepoint pipeline

I am doing this by having a wrapper transform object to which I can provide a transformation function and an optional savepoint parameter. The transform object can create savepoints, if they are provided:

class SavePointNode(beam.PTransform):
    def __init__(self, transform, savepoint=None, **kwargs):
        super(beam.PTransform, self).__init__(**kwargs)
        self.transform = transform
        self.savepoint = savepoint

    def expand(self, p):

        # transform data
        result = p | beam.ParDo(self.transform)

        # load to savepoint if available
        if self.savepoint:
            return result | beam.io.WriteToText(self.savepoint)

        return result

My first question is the following:

Loading savepoints

By creating savepoints I want to be able to restart the pipeline at a specific node to save time by not retracing each step up to a savepoint. For example: If an error occured in the third transform step:

error in third transform step

I want to be able to restart the pipeline from the savepoint created by the second transform step:

restart at second savepoint

My questions regarding this are the following:

标签: pythonapache-beam

解决方案


推荐阅读