google-cloud-platform - 如何传递动态参数气流运算符?
问题描述
我正在使用 Airflow 在 Google Cloud Composer 上运行 Spark 作业。我需要
- 创建集群(用户提供的 YAML 参数)
- 火花作业列表(作业参数也由每个作业 YAML 提供)
使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。
但是,考虑到DataprocClusterCreateOperator()
cluster_name
project_id
zone
和其他一些参数被标记为模板。
如果我想将其他参数作为模板传递(目前不是这样)怎么办?- 像image_version
,
num_workers
等等worker_machine_type
?
有什么解决方法吗?
解决方案
不确定“动态”是什么意思,但是当 yaml 文件更新时,如果读取文件过程在 dag 文件正文中,则 dag 将被刷新以从 yaml 文件中申请新的 args。所以实际上,您不需要 XCOM 来获取参数。只需简单地创建一个 params 字典,然后传递给 default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args = {
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
}
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
但是如果你想要动态的 dag 而不是参数,你可能需要像这样的其他策略。
所以你可能需要弄清楚基本概念:动态在哪个级别?任务级别?DAG级别?
或者您可以创建自己的 Operator 来完成这项工作并获取参数。
推荐阅读
- r - Rstudios sarima 只显示地块
- bluetooth-lowenergy - 在 iOS 中监听多个连接的 BLE 设备的特征值变化
- ruby-on-rails - 更新模型的所有道具
- python - 在 UI 上为 QLineEdit 的内容加下划线
- matrix - 如何为满足一定条件的矩阵元素赋值?
- laravel - Heroku 网站部署错误:“禁止您无权访问此资源。”
- docker - Docker 主机接口没有清理?
- r - 使用 map2 和模型映射预测
- c# - 使用 Visual Studio 2015 Windows Form App 建立与 SQL Server 的连接
- linux - 调用主脚本的 Systemd 单元文件