sql - 带有提交的 Airflow + pandas read_sql_query()
问题描述
问题
我可以使用 read_sql() 将 SQL 事务提交到数据库吗?
用例和背景
我有一个用例,我希望允许用户执行一些预定义的 SQL 并返回一个 pandas 数据框。在某些情况下,此 SQL 将需要查询预先填充的表,而在其他情况下,此 SQL 将执行一个写入表的函数,然后查询该表。此逻辑当前包含在 Airflow DAG 中的方法内部,以便利用使用 PostgresHook 的 Airflow 可访问的数据库连接信息 - 该方法最终在 PythonOperator 任务中调用。我通过测试了解到 PostgresHook 创建了一个 psycopg2 连接对象。
代码
from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd
def create_df(job_id,other_unrelated_inputs):
conn = job_type_to_connection(job_type) # method that helps choose a database
sql = open('/sql_files/job_id_{}.sql'.format(job_id)) #chooses arbitrary SQL
sql_template = sql.read()
hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
# Runs SQL template with variables, but does not commit. Alternatively, have used hook.get_pandas_df(sql_template)
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj)
except:
#catches some errors#
return df
问题
目前,在执行 SQL 函数时,此代码会生成一个数据帧,但不会提交在 SQL 函数中所做的任何数据库更改。例如,更准确地说,如果 SQL 函数 INSERT 将一行插入到表中,则该事务将不会提交,并且该行将不会出现在表中。
尝试
我尝试了一些修复,但被卡住了。我最近的努力是更改 read_sql 使用的 psycopg2 连接的 autocommit 属性,以便自动提交事务。
我承认我无法弄清楚连接的属性何时会影响 SQL 的执行。
我认识到另一种方法是复制PostgresHook.run()中的一些逻辑以提交,然后添加一些代码以将结果推送到数据帧中,但是对于未来的支持使用已经创建的方法来说,这似乎更加简洁和容易,如果可能的话。
我能找到的最类似的 SO question 是this one,但我对独立于气流的解决方案感兴趣。
编辑
...
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
hook_conn_obj.autocommit = True
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) # Runs SQL template with variables, but does not commit
except:
#catches some errors#
return df
这似乎有效。如果有人对实现这一目标的更好方法有任何评论或想法,我仍然有兴趣从讨论中学习。
谢谢!
解决方案
read_sql
不会提交,因为正如该方法名称所暗示的那样,目标是读取数据,而不是写入。这是很好的设计选择pandas
。这很重要,因为它可以防止意外写入并允许有趣的场景,例如运行过程、读取其效果但没有任何内容被持久化。read_sql
的意图是阅读,而不是写作。直接表达意图是黄金标准原则。
表达您的意图的一种更明确的方式是execute
在fetchall
. 但是由于pandas
没有提供从对象读取的简单方法cursor
,您将失去由提供的轻松自在的心态,read_sql
并且必须自己创建 DataFrame。
所以总而言之,您的解决方案很好,通过设置autocommit=True
您表明您的数据库交互将持续存在,因此应该不会发生意外。读起来有点奇怪,但是如果您将sql_template
变量命名为类似的名称write_then_read_sql
或在文档字符串中进行解释,则意图会更清楚。
推荐阅读
- powershell - 创建计划任务时出错
- jquery - 使用 jquery waterwheel 添加自动滑动
- apache-spark - Pyspark 挂在简单的命令上
- botframework - 机器人对话生成器
- r - 基于其他列实际值和先前值的值替换
- apache-storm - java.lang.NoSuchMethodError: org.apache.curator.framework.api.CreateBuilder.creatingParentsIfNeeded()
- ios - 无法写入 plist URL
- ios - WKWebView 在外部浏览器中打开 PDF
- google-sheets-api - 通过 api 从 Google 表格中删除自动链接格式
- css - SASS 函数迭代 nth-child(n+1) 和变亮/变暗($color, 5n%)