首页 > 解决方案 > 气流无法识别使用 pytest 夹具构建的 zip 文件 DAG

问题描述

我们正在使用带有气流 v1.10 和 Python 3.6.8 的 Google Composer(托管 Airflow 服务)。为了部署我们的 DAGS,我们采用 Packaged DAG ( https://airflow.apache.org/concepts.html?highlight=zip#packaged-dags ) 方法。

当从 cmd 行创建 zip 文件时,一切都很好

zip -r dag_under_test.zip test_dag.py

但是当我尝试从 pytest 夹具执行此操作时,我加载了 DagBag 并测试了我的 DAG 的完整性,气流根本无法识别这个 zip 文件。这是我的 pytest 夹具的代码

@fixture
def setup(config):
    os.system("zip -r dag_under_test.zip test_zip.py")


def test_import_dags(setup):
    dagbag = DagBag(include_examples=False)
    noOfDags = len(dagbag.dags)
    dagbag.process_file("dag_under_test.zip")
    assert len(dagbag.dags) == noOfDags + 1, 'DAG import failures. Errors: {}'.format(dagbag.import_errors)

我将此 zip 文件复制到 DAGs 文件夹,但气流根本无法识别它,没有错误消息。但是使用来自 cmdline 的相同命令构建的 zip 文件正在被气流加载!!好像我在这里遗漏了一些明显的东西,无法弄清楚。

标签: pythonzipairflowgoogle-cloud-composer

解决方案


所以事实证明,我在哪里创建 zip 文件很重要。在这种情况下,我从 test 文件夹创建 zip 文件并将文件存档在 src 文件夹中。尽管最终的 zip 文件肉眼看起来很完美,但气流正在拒绝它。我尝试在 zip 命令中添加“-j”(以删除目录名称),然后我的测试开始工作。

zip -r -j dag_under_test_metrics.zip ../src/metricsDAG.py

我还有另一个更大的问题,即当我的 DAG 项目中有一个完整的文件夹结构时测试相同的场景。顶层的 dag 文件,它引用了项目中的许多 python 模块。我无法通过上述技巧来解决这个问题,但想出了一个解决方法。我创建了一个小的 shell 脚本,它执行 zip 部分,就像这样..

SCRIPT_PATH=${0%/*/*}
cd $SCRIPT_PATH

zip -r -q test/dag_under_test.zip DagRunner.py
zip -r -q test/dag_under_test.zip tasks dag common resources

此 shell 脚本将 currentdir 更改为项目主页并从那里存档。我正在像这样从 pytest 夹具调用这个 shell

@fixture
def setup():
    os.system('rm {}'.format(DAG_UNDER_TEST))
    os.system('sh {}'.format(PACKAGE_SCRIPT))
    yield
    print("-------- clean up -----------")
    os.system('rm {}'.format(DAG_UNDER_TEST))

这与我的集成测试完美配合。

def test_conversionDAG(setup):
    configuration.load_test_config()
    dagbag = DagBag(include_examples=False)
    noOfDags = len(dagbag.dags)
    dagbag.process_file(DAG_UNDER_TEST)
    assert len(dagbag.dags) == noOfDags + 1, 'DAG import failures. Errors: {}'.format(dagbag.import_errors)
    assert dagbag.get_dag("name of the dag")

推荐阅读