首页 > 解决方案 > Airflow - Great Expectations - 将评估参数发送到 GreatExpectationsOperator

问题描述

对于在气流中使用 GreatExpectations 的任何人,有谁知道是否可以通过气流 GreatExpectationsOperator 发送评估参数?我目前正在尝试这个并收到错误:

airflow.exceptions.AirflowException: 无效的参数被传递给 GreatExpectationsOperator (task_id: my_task)。无效参数为:**kwargs: {'evaluation_parameters': {}}

谢谢,

标签: airflowgreat-expectations

解决方案


假设您使用GreatExpectationsOperator的是https://github.com/great-expectations/airflow-provider-great-expectations repo,它仅支持https://registry.astronomer.io/providers/great-expectations/modules中列出的以下参数/greatexpectationsoperator并在此处列出:

    :param run_name: Identifies the validation run (defaults to timestamp if not specified)
    :type run_name: Optional[str]
    :param data_context_root_dir: Path of the great_expectations directory
    :type data_context_root_dir: str
    :param data_contex: A great_expectations DataContext object
    :type data_contex: dict
    :param expectation_suite_name: The name of the Expectation Suite to use for validation
    :type expectation_suite_name: str
    :param batch_kwargs: The batch_kwargs to use for validation
    :type batch_kwargs: dict
    :param assets_to_validate: A list of dictionaries of batch_kwargs + Expectation Suites to use for validation
    :type assets_to_validate: iterable
    :param checkpoint_name: A Checkpoint name to use for validation
    :type checkpoint_name: str
    :param fail_task_on_validation_failure: Fail the Airflow task if the Great Expectation validation fails
    :type  fail_task_on_validation_failure: bool
    :param validation_operator_name: name of a Great Expectations validation operator, defaults to action_list_operator
    :type validation_operator_name: Optional[str]
    :param **kwargs: kwargs
    :type **kwargs: Optional[dict]

从链接检查以下 example_dag :

"""
A DAG that demonstrates implementation of the GreatExpectationsOperator. 
Note: you wil need to reference the necessary data assets and expectations suites in your project. You can find samples available in the provider source directory.
To view steps on running this DAG, check out the Provider Readme: https://github.com/great-expectations/airflow-provider-great-expectations#examples
"""

import logging
import os
import airflow
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from great_expectations_provider.operators.great_expectations_bigquery import GreatExpectationsBigQueryOperator

default_args = {
    "owner": "Airflow",
    "start_date": airflow.utils.dates.days_ago(1)
}

dag = DAG(
    dag_id='example_great_expectations_dag',
    default_args=default_args
)

# This runs an expectation suite against a sample data asset. You may need to change these paths if you do not have your `data`
# directory living in a top-level `include` directory. Ensure the checkpoint yml files have the correct path to the data file.
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_file = os.path.join(base_path, 'include',
                         'data/yellow_tripdata_sample_2019-01.csv')
ge_root_dir = os.path.join(base_path, 'include', 'great_expectations')


ge_batch_kwargs_pass = GreatExpectationsOperator(
    task_id='ge_batch_kwargs_pass',
    expectation_suite_name='taxi.demo',
    batch_kwargs={
        'path': data_file,
        'datasource': 'data__dir'
    },
    data_context_root_dir=ge_root_dir,
    dag=dag,
)

# This runs an expectation suite against a data asset that passes the tests
ge_batch_kwargs_list_pass = GreatExpectationsOperator(
    task_id='ge_batch_kwargs_list_pass',
    assets_to_validate=[
        {
            'batch_kwargs': {
                'path': data_file,
                'datasource': 'data__dir'
            },
            'expectation_suite_name': 'taxi.demo'
        }
    ],
    data_context_root_dir=ge_root_dir,
    dag=dag,
)

# This runs a checkpoint that will pass. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_pass = GreatExpectationsOperator(
    task_id='ge_checkpoint_pass',
    run_name='ge_airflow_run',
    checkpoint_name='taxi.pass.chk',
    data_context_root_dir=ge_root_dir,
    dag=dag
)

# This runs a checkpoint that will fail. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_fail = GreatExpectationsOperator(
    task_id='ge_checkpoint_fail',
    run_name='ge_airflow_run',
    checkpoint_name='taxi.fail.chk',
    data_context_root_dir=ge_root_dir,
    dag=dag
)

# This runs a checkpoint that will fail, but we set a flag to exit the task successfully.
ge_checkpoint_fail_but_continue = GreatExpectationsOperator(
    task_id='ge_checkpoint_fail_but_continue',
    run_name='ge_airflow_run',
    checkpoint_name='taxi.fail.chk',
    fail_task_on_validation_failure=False,
    data_context_root_dir=ge_root_dir,
    dag=dag
)

# This runs a checkpoint and passes in a root dir.
ge_checkpoint_pass_root_dir = GreatExpectationsOperator(
    task_id='ge_checkpoint_pass_root_dir',
    run_name='ge_airflow_run',
    checkpoint_name='taxi.pass.chk',
    data_context_root_dir=ge_root_dir,
    dag=dag
)
 
ge_batch_kwargs_list_pass >> ge_checkpoint_pass_root_dir >> ge_batch_kwargs_pass >>ge_checkpoint_fail_but_continue >> ge_checkpoint_pass >> ge_checkpoint_fail

推荐阅读