python - 气流 GKEPodOperator xcom_push 返回无
问题描述
所以我已经为此苦苦挣扎了好几个小时。这是我的运营商的代码:
task1 = GKEPodOperator(
task_id="task1",
project_id="proj",
location="location",
cluster_name="cluster_name",
name="cluster-calculator",
namespace="default",
image=Variable.get("cluster_calculator_image"),
arguments=['--name clustercalculator'],
env_vars=env_vars,
xcom_push=True,
is_delete_operator_pod=True,
get_logs=True,
dag=dag
)
该 pod 运行一个简单的 docker 容器,其中包含 Java 应用程序,其中执行一些操作并将结果写入默认的/airflow/xcom/result.json文件。
这就是我尝试获取 xcom_push 结果的方式:
def print_xcom_result(*op_args, **kwargs):
print(op_args)
print(kwargs['task_instance'].xcom_pull(task_ids='task1'))
test_values = PythonOperator(
task_id="task1_test",
python_callable=print_xcom_result,
provide_context=True,
op_args=["{{task_instance.xcom_pull(task_ids='task1')}}"],
dag=dag
)
无论我尝试什么,它总是打印无。
[2019-10-12 00:06:23,061] {{logging_mixin.py:95}} INFO - ('None',)
[2019-10-12 00:06:23,072] {{logging_mixin.py:95}} INFO - None
当我在 Airflow UI 上访问 XCOM 时,它什么也没有显示。我还尝试了这里的示例:Failed to extract xcom from airflow pod - Kubernetes Pod Operator,它也不起作用。
边车容器是肯定创建的,我在日志中看到了它的输出:
Running command... [1mcat /airflow/xcom/return.json[0m
Running command... [1mkill -s SIGINT 1[0m
INFO[0m - {"clusterSize":2}[0m
我什至尝试在外部运行 docker 容器,验证结果是否正确写入 xcom 目录,但在 DAG 执行期间无法获得此结果。
气流版本是最新的。蟒蛇 3.7
如果这很重要,我有 6 个运行 Airflow 的容器(webserver、flower、worker、scheduler、postgre、rabbitmq)。芹菜是执行者。Pod 在 Google Cloud 上的 Kubernetes Engine 中运行。
没有错误,两个运算符都成功了。
有没有人有任何想法?先感谢您。
解决方案
气流中存在一个错误,其中 GKEPodOperator 的执行没有返回语句
super(GKEPodOperator, self).execute(context)
应该
return super(GKEPodOperator, self).execute(context)
该错误已在气流 2.0 ( https://issues.apache.org/jira/browse/AIRFLOW-4072 )中修复
一个建议可能是创建一个插件 YourOwnGKEOperator 以在本地解决此问题。(并将其部署到您的插件文件夹中)
推荐阅读
- solidity - 为什么这个 txn 没有转移余额,我传递了与响应集完全相同的十六进制字符串
- tensorflow - 在 TFlite micro 中实现 SELU 激活
- apache-spark - Spark 如何处理部分缓存/持久化结果?
- reactjs - 浏览器未定义 - React
- c# - LINQ 中的通配符搜索
- c# - 违反 Entity Framework Core 中的 PRIMARY KEY 约束
- php - 在结帐时更改所有与 WooCommerce 消息相关的优惠券代码的位置
- bash - 如何从 bash 脚本正确启动 ejabberdctl?
- authentication - 试图从苹果获取 transfer_sub 但得到了 invalid_request
- elasticsearch - Elasticsearch 查询这种特殊类型的错字:“bath mat”与“bathmat”