首页 > 解决方案 > 将 xcom 从 PythonVirtualenvOperator 推送到 PythonBranchOperator

问题描述

我有以下 DAG:


def load_data():
    data = load_data()
    n_rows = data.shape[0]
    return n_rows

def get_n_rows(**kwargs):
   ti = kwargs["ti"]
   n_rows = ti.xcom_pull(task_ids="load_data")
   if n_rows>10:
      return "many_rows_task"
   return "not_so_many_rows_task"


with DAG(
    'test',
    default_args={"provide_context":True},
    description='A test',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['test']
) as dag:




t1 = PythonVirtualenvOperator(
 task_id = "load_data",
 requirements = [],
python_callable = load_data
)

t2 = BranchPythonOperator(
task_id = "branch",
python_callable = get_n_rows

问题是ti.xcom_pull("load_data")get_n_rows返回 None 即它的返回值load_data似乎没有被推送到 xcom。

如果我然后尝试使用xcom_push而不是仅返回其中的值,则当我将其作为输入添加到ieload_data时会失败**kwargsload_data

def load_data(**kwargs):
    ti = kwargs["ti"]
    ti.xcom_push(value=5)

(即使我只有一个空的身体,但**kwargs在输入中)

投掷

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 493, in execute
    super().execute(context=serializable_context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 512, in execute_callable
    self._write_args(input_filename)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 540, in _write_args
    self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
TypeError: can't pickle module objects

XCom在使用时有什么特殊的处理方法PythonVirtualenvOperator吗?

标签: airflow

解决方案


推荐阅读