python - 在气流自定义操作员之间传输数据
问题描述
我正在气流中创建自己的自定义运算符,并希望将一个运算符的输出用作另一个运算符的输入。目前我正在做的方式是将输出存储在 s3 中,并在下一个运算符中从 s3 中读取它们,这似乎是一种有效的方式。我遇到了以下帖子气流和运营商之间的数据传输, 但我不太清楚。如果有人能给我一个运营商之间数据传输的好例子,我将不胜感激。
class SFScoreDataOperator(BaseOperator):
@apply_defaults
def __init__(self, aws_key, aws_secret,
model_version='2020-04-v2',
*args, **kwargs):
self.__version__ = model_version
self.aws_key = aws_key
self.aws_secret = aws_secret
super(SFScoreDataOperator, self).__init__(*args, **kwargs)
def execute(self, context):
SuccessFactorData(aws_key=self.aws_key, aws_secret=self.aws_secret, to_random_shuffle=False).get_data()
class SFScoreGetTrainOperator(BaseOperator):
@apply_defaults
def __init__(self, aws_key, aws_secret,
model_version='2020-04-v2',
*args, **kwargs):
self.__version__ = model_version
self.aws_key = aws_key
self.aws_secret = aws_secret
super(SFScoreGetTrainOperator, self).__init__(*args, **kwargs)
def execute(self, context):
CM = SuccessFactorTrain(aws_key=self.aws_key, aws_secret=self.aws_secret)
df_to_predict = CM.fetch_predict_data()
df_train_test = CM.fetch_final_data()
CM.train_test(final_df_for_train_test=df_train_test, segment_type=None)
class SuccessFactorDataPluginV2(AirflowPlugin):
name = 'success_factor_data_plugin_v2'
operators = [SFScoreDataOperator]
class SuccessFactorTrainPluginV2(AirflowPlugin):
name = 'success_factor_train_plugin_v2'
operators = [SFScoreGetTrainOperator]
类 SFScoreDataOperator 中的类方法 get_data 输出两个表,它们是 SFScoreGetTrainOperator 中 train_test 方法的输入,而 train_test 方法输出 3 个变量,这些变量将作为下一个自定义运算符的输入。并非所有输出都是 CSV 格式,所以我无法将它们写入 s3。我阅读了 XCom 文档,但不确定如何实现它,所以如果我能得到推动,我将不胜感激。谢谢!
解决方案
推荐阅读
- magento - Magento 1.9 - Zend/Currency - 格式错误...缺少 0
- python-3.x - 将长格式日期转换为短格式
- redis - 如何获取设置的redis db?
- c# - 如何在 Visual Studio 中使用 localDB(MDF) 在数据库中重新设置标识列
- javascript - 文件上传异步功能不等待
- node.js - 如何在节点js中循环下载大量文件?
- android - 如何将我的 fitbit 数据集成到我自己的 Android 应用程序中?
- android - Firebase Vision Labeler 返回不一致的结果
- angular - 如何在 Angular Project 中捆绑字体,使其不应该从互联网上下载?
- c# - 将 BEMCheckBox 绑定到 ViewModel 中的问题 - MvvmCross、Xamarin.iOS