python - 在 Airflow 的 EmailOperator 中访问 Xcom
问题描述
我对 Airflow 很陌生,我在使用 Xcom 和 Jinja 时遇到了一些问题。
我必须做一些 Python 阐述,然后将结果传递给 EmailOperator 以便将其作为电子邮件正文发送。
似乎不存在关于它的文档,我发现的唯一提示是这个链接,它的格式很糟糕,不起作用,而且评论很糟糕。我认为它必须通过 Jinja 完成,但我不知道如何......
这是我正在做的暂定工作,有人可以帮我解释一下为什么这段代码是错误的以及如何修复它吗?
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 11, 7),
'email': ['ciccio.pasticcio@noreply.it'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'dg_daily_saint',
default_args=default_args,
schedule_interval='10 9 * * *')
task1 = PythonOperator(task_id="extract_daily_saint",
python_callable=extractDailySaint,
provide_context=True,
dag=dag)
def html_output(**context):
value=context['task_instance'].xcom_pull(task_ids='extract_daily_saint', key='saint')
return "<h1>" + value + "</h1>"
EMAIL_CONTENT = """
<b> {{ html_output(context) }}</b>
"""
mail = EmailOperator(
task_id='mail',
to='ciccio.pasticcio@noreply.it',
subject='DataGovernance',
html_content=EMAIL_CONTENT,
provide_context=True,
dag=dag)
task1 >> mail
解决方案
卡住可能是另一个问题,但我对推拉器有点困惑。
Pusher 是将参数推送到另一个运算符的运算符。推杆需要xcom_push=True
。对于PythonOperator
,将推送一个返回值。
Puller 是操作者从 pusher 接收参数。拉拔器需求provide_context=True
因此,在您的示例中
task1 = PythonOperator(task_id="extract_daily_saint",
python_callable=extractDailySaint,
xcom_push=True, # not provide_context
dag=dag)
然后在你的 puller 中,你可以直接在 Jinja 模板中使用宏。
mail = EmailOperator(
task_id='mail',
to='ciccio.pasticcio@noreply.it',
subject='DataGovernance',
html_content="<b><h1> {{ task_instance.xcom_pull(task_ids='extract_daily_saint') }} </h1></b>",
provide_context=True, # puller needs provide_context
dag=dag)
推荐阅读
- jsf - 由于 javax.faces.ViewState,无法从 Postman 加载网页?
- java - 使用 openapi-generator-gradle-plugin 只为 spring boot 生成 REST 接口
- python - 如何动态过滤 Pandas 数据框中的数据?
- javascript - 唯一数组Javascript中具有不同键的数组列表
- python - 处理 pandas Dataframe 中的 no-List
- java - 如何使用 Retrofit 从 OpenWeatherMap API 获取“天气”对象数据
- c# - 使用 DLLImport 从 C# 调用 c++,找不到对象引用错误
- javascript - 无法理解错误:TypeError:无法读取未定义的属性“订阅”
- python - 在python中计算GPS点之间的测地线距离
- c - 标记 doxygen 注释并将它们放在单独的文件中/从源/注释块构建文档