首页 > 解决方案 > 在气流自定义操作员之间传输数据

问题描述

我正在气流中创建自己的自定义运算符,并希望将一个运算符的输出用作另一个运算符的输入。目前我正在做的方式是将输出存储在 s3 中,并在下一个运算符中从 s3 中读取它们,这似乎是一种有效的方式。我遇到了以下帖子气流和运营商之间的数据传输, 但我不太清楚。如果有人能给我一个运营商之间数据传输的好例子,我将不胜感激。

class SFScoreDataOperator(BaseOperator):
    @apply_defaults
    def __init__(self, aws_key, aws_secret,
                 model_version='2020-04-v2',
                 *args, **kwargs):
        self.__version__ = model_version
        self.aws_key = aws_key
        self.aws_secret = aws_secret
        super(SFScoreDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        SuccessFactorData(aws_key=self.aws_key, aws_secret=self.aws_secret, to_random_shuffle=False).get_data()


class SFScoreGetTrainOperator(BaseOperator):
    @apply_defaults
    def __init__(self, aws_key, aws_secret,
                 model_version='2020-04-v2',
                 *args, **kwargs):
        self.__version__ = model_version
        self.aws_key = aws_key
        self.aws_secret = aws_secret
        super(SFScoreGetTrainOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        CM = SuccessFactorTrain(aws_key=self.aws_key, aws_secret=self.aws_secret)
        df_to_predict = CM.fetch_predict_data()
        df_train_test = CM.fetch_final_data()
        CM.train_test(final_df_for_train_test=df_train_test, segment_type=None)

class SuccessFactorDataPluginV2(AirflowPlugin):
    name = 'success_factor_data_plugin_v2'
    operators = [SFScoreDataOperator]


class SuccessFactorTrainPluginV2(AirflowPlugin):
    name = 'success_factor_train_plugin_v2'
    operators = [SFScoreGetTrainOperator]

类 SFScoreDataOperator 中的类方法 get_data 输出两个表,它们是 SFScoreGetTrainOperator 中 train_test 方法的输入,而 train_test 方法输出 3 个变量,这些变量将作为下一个自定义运算符的输入。并非所有输出都是 CSV 格式,所以我无法将它们写入 s3。我阅读了 XCom 文档,但不确定如何实现它,所以如果我能得到推动,我将不胜感激。谢谢!

标签: pythonairflow

解决方案


推荐阅读