首页 > 解决方案 > 气流:psycopg2.ProgrammingError:无法适应类型“PythonOperator”

问题描述

运行 DAG 时出现上述错误。

psycopg2.ProgrammingError: can't adapt type 'PythonOperator'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/crypto_etl.py", line 61, in create_text_file
    close_data = ti.xcom_pull(key=None, task_ids=[transform_data])
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1966, in xcom_pull
    for result in query.with_entities(XCom.task_id, XCom.value)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
    return self._execute_and_instances(context)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
    distilled_params,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'PythonOperator'
[SQL: SELECT xcom.task_id AS xcom_task_id, xcom.value AS xcom_value 
FROM xcom 
WHERE xcom.task_id IN (%(task_id_1)s) AND xcom.dag_id = %(dag_id_1)s AND xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC]
[parameters: {'task_id_1': <Task(PythonOperator): transform_data>, 'dag_id_1': 'crypto_analysis', 'execution_date_1': DateTime(2021, 9, 30, 10, 0, 0, tzinfo=Timezone('UTC'))}]
(Background on this error at: http://sqlalche.me/e/13/f405)
[2021-09-30 10:02:37,270] {taskinstance.py:1513} INFO - Marking task as FAILED. dag_id=crypto_analysis, task_id=create_text_file, execution_date=20210930T100000, start_date=20210930T100236, end_date=20210930T100237
[2021-09-30 10:02:37,463] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-09-30 10:02:37,693] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

下面是代码:

def transform_data(**kwargs):
    ti = kwargs['ti']
    binance = []
    ftx = []
    bybit = []
    binance_ohlcv_data, ftx_ohlcv_data, fetch_bybit_ohlcv = ti.xcom_pull(key=None, task_ids=['fetch_binance_ohlcv',
                                                                                             'fetch_ftx_ohlcv',
                                                                                             'fetch_bybit_ohlcv'])
    for record in binance_ohlcv_data:
        binance.append({'ts': record[0], 'close': record[4], 'volume': record[5]})

    for record in ftx_ohlcv_data:
        ftx.append({'ts': record[0], 'close': record[4], 'volume': record[5]})

    for record in fetch_bybit_ohlcv:
        bybit.append({'ts': record[0], 'close': record[4], 'volume': record[5]})

    # return {'binance': binance, 'ftx': ftx, 'bybit': bybit}
    return binance


def create_text_file(**kwargs):
    ti = kwargs['ti']
    today = datetime.now()
    time_part = today.strftime('%d%m%y%H%m%S')
    # binance_ohlcv_data, ftx_ohlcv_data, fetch_bybit_ohlcv = ti.xcom_pull(key=None, task_ids=['fetch_binance_ohlcv',
    #                                                                                          'fetch_ftx_ohlcv',
    #                                                                                          'fetch_bybit_ohlcv'])

    close_data = ti.xcom_pull(key=None, task_ids=[transform_data])
    print(close_data)

标签: pythonsqlalchemyairflowairflow-scheduler

解决方案


这一行有一个问题:

close_data = ti.xcom_pull(key=None, task_ids=[transform_data])

transform_data函数,而不是字符串列表,适用于ti.xcom_pull函数。您可能应该像以前一样使用它:

ti.xcom_pull(key=None, task_ids=['fetch_binance_ohlcv', 
                                 'fetch_ftx_ohlcv', 
                                 'fetch_bybit_ohlcv'])

推荐阅读