python - 气流 - 损坏的 DAG:[/var/app/current/dags/product/product_snapshot.py] 'module' 对象不可调用
问题描述
我正在尝试在 Airflow 中构建 DAG。因为它是一个新项目,所以我创建了一个新文件夹,其中包含所有与产品相关的 DAG。我得到 Broken DAG 'module' 对象不可调用。不是 DAG id,也没有任何函数被命名为文件,所以我不明白“模块”错误。我应该对我创建的新文件夹做任何事情吗
import pandas as pd
import psycopg2
import datetime
import io
from airflow import DAG
from airflow.models import Variable
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
# ----------------------------------------------------------------------------------------------------------------------
# Connections
# ----------------------------------------------------------------------------------------------------------------------
def db1_conn():
# Core Credentials
db1_hook = BaseHook.get_connection('db1')
db1_conn = psycopg2.connect(dbname=db1_hook.schema, user=db1_hook.login,
password=db1_hook.password, host=db1_hook.host,
port=db1_hook.port)
return db1_conn
def db2_write():
# Core Write
db2_hook = BaseHook.get_connection('db2-write')
db2_conn = psycopg2.connect(dbname=db2_hook.schema, user=db2_hook.login,
password=db2_hook.password, host=db2_hook.host,
port=db2_hook.port)
return db2_conn
# ----------------------------------------------------------------------------------------------------------------------
# Python Callable
# ----------------------------------------------------------------------------------------------------------------------
def run_this():
query = f"""
SELECT DISTINCT ON (userid) 0 AS id,
'{month}'::timestamp AS month,
userid,
balance,
transactiondate AS last_transaction_date,
transactionexternaluniqueid AS last_transaction_unique_id
FROM public.product_view
WHERE transactiondate < '{month}'
ORDER BY userid, transactiondate DESC
"""
data = pd.read_sql(query, db1_conn())
# Index.
index_query = """
SELECT COALESCE(MAX(id), 0)::INT AS id
FROM schema.product_snapshot
"""
index_start = pd.read_sql(index_query, db1_conn)
data['id'] = data.index + index_start + 1
# Write on table.
buf = io.StringIO()
stoplightdata_buffer.write(data.to_csv(index=None, header=None))
buf.seek(0)
conn = core_write()
cur = conn.cursor()
cur.copy_from(buf, table='schema.product_snapshot', sep=',', null='')
conn.commit()
# ----------------------------------------------------------------------------------------------------------------------
# DAG
# ----------------------------------------------------------------------------------------------------------------------
DAG_NAME = 'product_snapshot_dag'
args = {
'owner': 'airflow',
'start_date': datetime(2020, 3, 22),
'email': [ 'email@email.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=2),
'sla': timedelta(minutes=30),
'execution_timeout': timedelta(minutes=50)
}
dag = DAG(DAG_NAME,
schedule_interval='1 * 1 * *',
default_args=args)
# ----------------------------------------------------------------------------------------------------------------------
# Postgres Operator
# ----------------------------------------------------------------------------------------------------------------------
product_table_creation = PostgresOperator(task_id='product_table_creation',
postgres_conn_id='connection',
sql='sql/product_table_creation.sql',
dag=dag)
# ----------------------------------------------------------------------------------------------------------------------
# Python Operator
# ----------------------------------------------------------------------------------------------------------------------
product_snapshot_data = PythonOperator(task_id='product_snapshot_data',
provide_context=True,
python_callable=run_this,
dag=dag)
product_table_creation >> product_snapshot_data
if __name__ == "__main__":
dag.cli()
DAG 新文件夹位于其他 DAG 文件夹中。
解决方案
推荐阅读
- r - 使用 `rlang` 创建一个`~ x + y` 类型的新公式
- sql-server - Entity Framework Core + Spatial Data is Raising SRID not vaild 错误
- python - 按钮和标签未正确放置在框架上
- node.js - 具有服务主体和机密的 node.js REST 端点的 Azure AD 身份验证
- mysql - 如何将两个选定的列组合成一个表?
- functional-programming - Perl 6使用reduce一次计算一个int数组的平均值
- r - 为什么 ggplot 错误地填充了我的曲线下的区域?
- perl - 包括在当前文件中生成的 perl 文件
- phaser-framework - 来回移动平台的代码
- angular - 如何对数据值使用指令?