airflow - 气流 python 通过 XCOM 处理返回值
问题描述
我正在尝试创建如下所述的 Airflow dag:我有一个非常大的 python 代码,它最终会创建一个文件。该文件是使用特定名称创建的,例如sales20180802130200.json
以下气流任务是s3BucketUpload
操作员。它需要获取文件名才能上传到 s3。
第一个 python 文件可能由bashOperator
. 它如何使用文件名创建 Xcom 密钥?有没有其他方法可以传递价值?
谢谢沙比
解决方案
只要路径参数包含在运算符类中的模板参数中,Airflow 就会为您插入值。我不知道 s3BucketUpload 运算符的样子,所以我假设参数名称。
class s3BucketUploadOperator(BaseOperator):
# this tuple is not used by anything in my operator classes and is not passed anywhere
template_fields = ('local_path', 's3_path', )
...
def py_fn(task_instance, **context):
task_instance.xcom_push(key='file_name', value='file.name')
py_task = PythonOperator(
dag=dag,
task_id='py_task',
provide_context=True,
python_callable=py_fn
)
s3_task = s3BucketUploadOperator(
dag=dag,
task_id='s3_task',
s3_conn_id='?',
local_path="path/to/dir/{{ task_instance.xcom_pull(key='file_name', task_ids='py_task') }}",
s3_path="path/to/s3/dir/{{ task_instance.xcom_pull(key='file_name', task_ids='py_task') }}"
)
编辑
如果要使用 BashOperator,bash_command="echo {{ task_instance.xcom_pull(key='file_name', task_ids='py_task') }}"
应将文件名打印到任务日志。
推荐阅读
- javascript - 当行包含一定数量的列时调用javascript函数
- php - 如何将自定义选项传递给自定义 Symfony FormType 的树枝模板?
- macos - 在没有 ./ 的情况下在 zsh 中运行脚本之前在 /usr/local/bin 中有效,但在 ~/bin 中无效
- scala - 用户类抛出异常:scala.ScalaReflectionException:classxyxz
- json - NewtonSoft.Json 为所有人返回 null
- python - ImportError:导入_sqlite3时DLL加载失败:找不到指定的模块
- javascript - 在 JS 中装饰函数或对象
- nginx - 带有拆分客户端的 nginx A/B 测试
- python - 如果我使用网格搜索 cv,是否需要做 CV?
- variadic-functions - 通过 JNA 从 Java 调用 C varargs 函数