airflow - Airflow - Great Expectations - 将评估参数发送到 GreatExpectationsOperator
问题描述
对于在气流中使用 GreatExpectations 的任何人,有谁知道是否可以通过气流 GreatExpectationsOperator 发送评估参数?我目前正在尝试这个并收到错误:
airflow.exceptions.AirflowException: 无效的参数被传递给 GreatExpectationsOperator (task_id: my_task)。无效参数为:**kwargs: {'evaluation_parameters': {}}
谢谢,
解决方案
假设您使用GreatExpectationsOperator
的是https://github.com/great-expectations/airflow-provider-great-expectations repo,它仅支持https://registry.astronomer.io/providers/great-expectations/modules中列出的以下参数/greatexpectationsoperator并在此处列出:
:param run_name: Identifies the validation run (defaults to timestamp if not specified)
:type run_name: Optional[str]
:param data_context_root_dir: Path of the great_expectations directory
:type data_context_root_dir: str
:param data_contex: A great_expectations DataContext object
:type data_contex: dict
:param expectation_suite_name: The name of the Expectation Suite to use for validation
:type expectation_suite_name: str
:param batch_kwargs: The batch_kwargs to use for validation
:type batch_kwargs: dict
:param assets_to_validate: A list of dictionaries of batch_kwargs + Expectation Suites to use for validation
:type assets_to_validate: iterable
:param checkpoint_name: A Checkpoint name to use for validation
:type checkpoint_name: str
:param fail_task_on_validation_failure: Fail the Airflow task if the Great Expectation validation fails
:type fail_task_on_validation_failure: bool
:param validation_operator_name: name of a Great Expectations validation operator, defaults to action_list_operator
:type validation_operator_name: Optional[str]
:param **kwargs: kwargs
:type **kwargs: Optional[dict]
从链接检查以下 example_dag :
"""
A DAG that demonstrates implementation of the GreatExpectationsOperator.
Note: you wil need to reference the necessary data assets and expectations suites in your project. You can find samples available in the provider source directory.
To view steps on running this DAG, check out the Provider Readme: https://github.com/great-expectations/airflow-provider-great-expectations#examples
"""
import logging
import os
import airflow
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from great_expectations_provider.operators.great_expectations_bigquery import GreatExpectationsBigQueryOperator
default_args = {
"owner": "Airflow",
"start_date": airflow.utils.dates.days_ago(1)
}
dag = DAG(
dag_id='example_great_expectations_dag',
default_args=default_args
)
# This runs an expectation suite against a sample data asset. You may need to change these paths if you do not have your `data`
# directory living in a top-level `include` directory. Ensure the checkpoint yml files have the correct path to the data file.
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_file = os.path.join(base_path, 'include',
'data/yellow_tripdata_sample_2019-01.csv')
ge_root_dir = os.path.join(base_path, 'include', 'great_expectations')
ge_batch_kwargs_pass = GreatExpectationsOperator(
task_id='ge_batch_kwargs_pass',
expectation_suite_name='taxi.demo',
batch_kwargs={
'path': data_file,
'datasource': 'data__dir'
},
data_context_root_dir=ge_root_dir,
dag=dag,
)
# This runs an expectation suite against a data asset that passes the tests
ge_batch_kwargs_list_pass = GreatExpectationsOperator(
task_id='ge_batch_kwargs_list_pass',
assets_to_validate=[
{
'batch_kwargs': {
'path': data_file,
'datasource': 'data__dir'
},
'expectation_suite_name': 'taxi.demo'
}
],
data_context_root_dir=ge_root_dir,
dag=dag,
)
# This runs a checkpoint that will pass. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_pass = GreatExpectationsOperator(
task_id='ge_checkpoint_pass',
run_name='ge_airflow_run',
checkpoint_name='taxi.pass.chk',
data_context_root_dir=ge_root_dir,
dag=dag
)
# This runs a checkpoint that will fail. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_fail = GreatExpectationsOperator(
task_id='ge_checkpoint_fail',
run_name='ge_airflow_run',
checkpoint_name='taxi.fail.chk',
data_context_root_dir=ge_root_dir,
dag=dag
)
# This runs a checkpoint that will fail, but we set a flag to exit the task successfully.
ge_checkpoint_fail_but_continue = GreatExpectationsOperator(
task_id='ge_checkpoint_fail_but_continue',
run_name='ge_airflow_run',
checkpoint_name='taxi.fail.chk',
fail_task_on_validation_failure=False,
data_context_root_dir=ge_root_dir,
dag=dag
)
# This runs a checkpoint and passes in a root dir.
ge_checkpoint_pass_root_dir = GreatExpectationsOperator(
task_id='ge_checkpoint_pass_root_dir',
run_name='ge_airflow_run',
checkpoint_name='taxi.pass.chk',
data_context_root_dir=ge_root_dir,
dag=dag
)
ge_batch_kwargs_list_pass >> ge_checkpoint_pass_root_dir >> ge_batch_kwargs_pass >>ge_checkpoint_fail_but_continue >> ge_checkpoint_pass >> ge_checkpoint_fail
推荐阅读
- javascript - 如何简洁地使用 lodash 更新嵌套的对象数组?
- cookies - 使用 Supertest 时未定义 request.cookies
- python - 我可以通过 django python 中的 model.py 插入到 db
- tpm - TPM 2.0:避免每次使用的密钥(重新)生成开销
- weaviate - 分类问题,POST 到分类会导致错误
- css - calc() 中的 CSS min() 在 Firefox 中中断,但在 chrome 中没有
- android - 从 D 导航返回时防止破坏(或恢复状态)片段 B - 导航组件
- javascript - 网络套接字 | 净::ERR_CONNECTION_REFUSED
- c++ - ...(省略号)作为函数原型中的一个且唯一的函数参数在 C++ 中意味着什么?
- html - 覆盖伪类元素引导程序