airflow - 将 xcom 从 PythonVirtualenvOperator 推送到 PythonBranchOperator
问题描述
我有以下 DAG:
def load_data():
data = load_data()
n_rows = data.shape[0]
return n_rows
def get_n_rows(**kwargs):
ti = kwargs["ti"]
n_rows = ti.xcom_pull(task_ids="load_data")
if n_rows>10:
return "many_rows_task"
return "not_so_many_rows_task"
with DAG(
'test',
default_args={"provide_context":True},
description='A test',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
tags=['test']
) as dag:
t1 = PythonVirtualenvOperator(
task_id = "load_data",
requirements = [],
python_callable = load_data
)
t2 = BranchPythonOperator(
task_id = "branch",
python_callable = get_n_rows
问题是ti.xcom_pull("load_data")
在get_n_rows
返回 None 即它的返回值load_data
似乎没有被推送到 xcom。
如果我然后尝试使用xcom_push
而不是仅返回其中的值,则当我将其作为输入添加到ieload_data
时会失败**kwargs
load_data
def load_data(**kwargs):
ti = kwargs["ti"]
ti.xcom_push(value=5)
(即使我只有一个空的身体,但**kwargs
在输入中)
投掷
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 493, in execute
super().execute(context=serializable_context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 512, in execute_callable
self._write_args(input_filename)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 540, in _write_args
self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
TypeError: can't pickle module objects
XCom在使用时有什么特殊的处理方法PythonVirtualenvOperator
吗?
解决方案
推荐阅读
- javascript - 如何创建js和html css
- sql - 如何优化此 UNION 查询?
- scala - 由 Scala 宏生成时,依赖类型似乎“不起作用”
- python - 给定两点找到斜率 - Python 类
- c++ - 如何让std :: thread在执行其成员函数后自动删除对象
- c++ - 为什么分段错误(核心转储)?
- marklogic - 节点集群(MarkLogic)
- android - android-keystore 的密码是空的吗?
- python - Django模型中的Heroku数据库缺少字段
- c++ - 为什么在尝试创建动态矩阵时抛出 'std::bad_alloc' what(): std:: bad_alloc 的实例后调用终止?