python - 雪花查询中的编程错误
问题描述
我有一个 DAG,它在函数内执行雪花 sql 语句。SQL 是一个插入语句,将记录从一个表插入到另一个表。插入时,源表中的一些记录有问题,没有插入到目标表中。这导致插入过程失败。
要求:
我想跳过错误记录并将下一条记录从源表加载到目标表。
我想将错误记录写入 GCS 存储桶中的文件。
from airflow import DAG from airflow.operators import python_operator from datetime import datetime import logging from my_folder import param_file from snowflake.connector.errors import ProgrammingError logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) args = { "owner": "test_dag", "start_date": datetime(2021, 3, 2, 10, 1), 'depends_on_past': False, 'email': ['mymailid@example.com'], 'email_on_failure': True } cur = param_file.cur ## param_file is a python file that contains the connection details to Snowflake query = """insert into employees(first_name, last_name, workphone, city,postal_code) select contractor_first,contractor_last,worknum,null,zip_code from contractors""" def run_sql(**context): try: cur.execute(query) except ProgrammingError as db_ex: print(f"Programming error: {db_ex}") with DAG( dag_id="EXCEPTION_testing", schedule_interval=None, max_active_runs=1,catchup=False,default_args=args) as dag: task1= python_operator.PythonOperator( task_id='snowflake_query_execution', python_callable=run_sql, dag=dag)
我曾尝试使用 try 和 except,但任务是为错误记录引发异常,并且不处理源表中的下一个记录。
任何帮助表示赞赏。
解决方案
推荐阅读
- api - 如何通过 API 获取 Google Cloud Translation 使用的字符?
- asp.net-mvc-4 - 插入包含标识列 System.Data.Entity.Infrastructure.DbUpdateException 的表时出错
- r - R:使用数据帧作为另一个数据帧的参数编写函数
- sql - 如何调用递归查询?
- ruby - 连接食谱厨师的多个数据包
- jquery - 计时器完成的 ajax 请求是否会延长会话?
- python - 如何使用 mraa 将 lsm9ds1 连接到扬声器?
- android - NullPointerException 使用改造响应
- html - 当我们按 Ctrl+P 时,CSS 的内容颜色在 Firefox 打印机上不起作用
- android - 有没有办法指定在 snapdragon 820 的性能核心上运行 android ndk 应用程序?