首页 > 解决方案 > 气流 - 损坏的 DAG:[/var/app/current/dags/product/product_snapshot.py] 'module' 对象不可调用

问题描述

我正在尝试在 Airflow 中构建 DAG。因为它是一个新项目,所以我创建了一个新文件夹,其中包含所有与产品相关的 DAG。我得到 Broken DAG 'module' 对象不可调用。不是 DAG id,也没有任何函数被命名为文件,所以我不明白“模块”错误。我应该对我创建的新文件夹做任何事情吗

import pandas as pd
import psycopg2
import datetime
import io
from airflow import DAG
from airflow.models import Variable
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

# ----------------------------------------------------------------------------------------------------------------------
# Connections
# ----------------------------------------------------------------------------------------------------------------------


def db1_conn():
    # Core Credentials
    db1_hook = BaseHook.get_connection('db1')
    db1_conn = psycopg2.connect(dbname=db1_hook.schema, user=db1_hook.login,
                                 password=db1_hook.password, host=db1_hook.host,
                                 port=db1_hook.port)
    return db1_conn


def db2_write():
    # Core Write
    db2_hook = BaseHook.get_connection('db2-write')
    db2_conn = psycopg2.connect(dbname=db2_hook.schema, user=db2_hook.login,
                                 password=db2_hook.password, host=db2_hook.host,
                                 port=db2_hook.port)
    return db2_conn


# ----------------------------------------------------------------------------------------------------------------------
# Python Callable
# ----------------------------------------------------------------------------------------------------------------------


def run_this():
    query = f"""
            SELECT DISTINCT ON (userid) 0                                    AS id,
                                        '{month}'::timestamp                 AS month,
                                        userid,
                                        balance,
                                        transactiondate                      AS last_transaction_date,
                                        transactionexternaluniqueid          AS last_transaction_unique_id
            FROM public.product_view
            WHERE transactiondate < '{month}'
            ORDER BY userid, transactiondate DESC
            """
    data = pd.read_sql(query, db1_conn())
    # Index.
    index_query = """
                  SELECT COALESCE(MAX(id), 0)::INT AS id
                  FROM schema.product_snapshot
                  """
    index_start = pd.read_sql(index_query, db1_conn)
    data['id'] = data.index + index_start + 1
    # Write on table.
    buf = io.StringIO()
    stoplightdata_buffer.write(data.to_csv(index=None, header=None))
    buf.seek(0)
    conn = core_write()
    cur = conn.cursor()
    cur.copy_from(buf, table='schema.product_snapshot', sep=',', null='')
    conn.commit()


# ----------------------------------------------------------------------------------------------------------------------
# DAG
# ----------------------------------------------------------------------------------------------------------------------

DAG_NAME = 'product_snapshot_dag'

args = {
    'owner': 'airflow',
    'start_date': datetime(2020, 3, 22),
    'email': [ 'email@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
    'sla': timedelta(minutes=30),
    'execution_timeout': timedelta(minutes=50)
}

dag = DAG(DAG_NAME,
          schedule_interval='1 * 1 * *',
          default_args=args)

# ----------------------------------------------------------------------------------------------------------------------
# Postgres Operator
# ----------------------------------------------------------------------------------------------------------------------

product_table_creation = PostgresOperator(task_id='product_table_creation',
                                          postgres_conn_id='connection',
                                          sql='sql/product_table_creation.sql',
                                          dag=dag)

# ----------------------------------------------------------------------------------------------------------------------
# Python Operator
# ----------------------------------------------------------------------------------------------------------------------


product_snapshot_data = PythonOperator(task_id='product_snapshot_data',
                                       provide_context=True,
                                       python_callable=run_this,
                                       dag=dag)


product_table_creation >> product_snapshot_data


if __name__ == "__main__":
    dag.cli()

DAG 新文件夹位于其他 DAG 文件夹中。

标签: pythonairflow

解决方案


推荐阅读