首页 > 解决方案 > 将模块导入气流上的虚拟环境

问题描述

我有一个打包的 DAG,我将模块存储在子文件夹中。我正在使用 PythonVirtualenvOperator 并希望从虚拟环境访问此模块。

文件夹系统 -

dags/
    packaged_dag.zip/
        dag.py
        package/
            my_module.py
            __init__.py

dag.py

from airflow.operators.python_operator import PythonVirtualenvOperator

def my_function(**kwargs):
    from package import my_module


with models.DAG(
default_args=default_dag_args) as dag:
virtualenv_task = PythonVirtualenvOperator(
    task_id="virtualenv_python",
    python_callable=my_function,
    system_site_packages=True,
    dag=dag,
)

为此,我将找不到包模块。如果我将导入移动到主 dag 文件(如使用 PythonVirtualenvOperator) - 它会正常工作,但我想要来自 virtualenv 的文件。

标签: pythonvirtualenvairflowvirtual-environment

解决方案


这对我有用:

dags/
  folder/
    my_module.py
  main.py

主文件

default_args = {
    "owner": "airflow"}

dag = DAG(
    dag_id='example',
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
)

def my_module_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from dags.folder.my_module import main as my_module
    my_module()

check_database = PythonVirtualenvOperator(
    task_id="my_module",
    python_callable=my_module_virtualenv,
    requirements=["psycopg2-binary==2.8.6"],
    system_site_packages=True,
    dag=dag,
)

这不是标准做法,因为没有__init__.py,但它有效。__init__.py当我试图这样做时给我带来了麻烦docker-compose up airflow-init


推荐阅读