首页 > 解决方案 > 未找到 Airflow Composer 自定义模块 - PythonVirtualenvOperator

问题描述

我在 GCP Composer 中有一个非常简单的 Airflow 实例设置。它有桶和一切。我想设置每个 dag 以使用 PythonVirtualenvOperator 运行它自己的环境。

其中的结构如下:

dags ->
------> code_snippets/
----------> print_name.py - has function called print_my_name() which prints a string into the terminal
------> test_dag.py

test_dag.py:

import datetime
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow import DAG


def main_func():
    import pandas as pd
    import datetime

    from code_snippets.print_name import print_my_name
    print_my_name()

    df = pd.DataFrame(data={
        'date': [str(datetime.datetime.now().date())]
    })

    print(df)

default_args = {
    'owner': 'test_dag',
    'start_date': datetime.datetime(2020, 7, 3, 5, 1, 00),
    'concurrency': 1,
    'retries': 0
}

dag = DAG('test_dag', description='Test DAGS with environment',
          schedule_interval='0 5 * * *',
          default_args=default_args, catchup=False)

test_the_dag = PythonVirtualenvOperator(
    task_id="test_dag",
    python_callable=main_func,
    python_version='3.8',
    requirements=["DateTime==4.3", "numpy==1.20.2", "pandas==1.2.4", "python-dateutil==2.8.1", "pytz==2021.1",
                  "six==1.15.0", "zope.interface==5.4.0"],
    system_site_packages=False,
    dag=dag,
)

test_the_dag

一切正常,直到我开始导入自定义模块 - 有一个init .py 并没有帮助,它仍然给出相同的错误,在我的情况下是:

from code_snippets.print_name import print_my_name\nModuleNotFoundError: No module named \'code_snippets\'

我也有一个本地的 Airflow 实例,我遇到了同样的问题。我尝试过移动东西或将文件夹的路径添加到 PATH,在目录中添加 inits 甚至更改导入语句,但只要我导入自定义模块,错误就会持续存在。

system_site_packages=False 或 True 也无效

是否有解决方法或解决方法,以便我可以利用我在 DAG 之外分离的自定义代码?

气流版本:1.10.14+composer 气流的 Python 版本设置为:3

标签: python-3.xairflowgoogle-cloud-composer

解决方案


的实现airflow.operators.python.PythonVirtualenvOperator是这样的,python_callable预计不会引用外部名称。

可调用中使用的任何非标准库包都必须在requirements.txt文件中声明为外部依赖项。

如果您需要使用code_snippets,请将其作为包发布到 pypi 或 VCS 存储库,并将其添加到 .kwargs 中的包列表requirementsPythonVirtualenvOperator


推荐阅读