google-cloud-sql - 如何在云作曲家的气流中使用云 sql 执行 PostgreSQL SELECT 查询?
问题描述
我是云作曲家的新手,我想在云作曲家的气流中使用 gcp_cloud_sql 挂钩执行一个 PostgreSQL SELECT 查询。我尝试使用 CloudSqlQueryOperator,但它不适用于 SELECT 查询。
我想根据从此选择查询中获得的结果创建 DAG。但是,我无法为此 SELECT 查询创建甚至简单的连接。
from six.moves.urllib.parse import quote_plus
import airflow
from airflow import models
from airflow.contrib.operators.gcp_sql_operator import (
CloudSqlQueryOperator
)
from datetime import date, datetime, timedelta
GCP_PROJECT_ID = "adtech-dev"
GCP_REGION = "<my cluster zone>"
GCSQL_POSTGRES_INSTANCE_NAME_QUERY = "testpostgres"
GCSQL_POSTGRES_DATABASE_NAME = ""
GCSQL_POSTGRES_USER = "<PostgreSQL User Name>"
GCSQL_POSTGRES_PASSWORD = "**********"
GCSQL_POSTGRES_PUBLIC_IP = "0.0.0.0"
GCSQL_POSTGRES_PUBLIC_PORT = "5432"
rule_query = "select r.id from rules r where r.id = 1"
postgres_kwargs = dict(
user=quote_plus(GCSQL_POSTGRES_USER),
password=quote_plus(GCSQL_POSTGRES_PASSWORD),
public_port=GCSQL_POSTGRES_PUBLIC_PORT,
public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
project_id=quote_plus(GCP_PROJECT_ID),
location=quote_plus(GCP_REGION),
instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME_QUERY),
database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME)
)
default_args = {
'owner': 'airflow',
'start_date': datetime(2018, 5, 31),
'email': ['aniruddha.dwivedi@xyz.com'],
'email_on_failure': True,
'email_on_retry': False,
'depends_on_past': False,
'catchup': False,
'retries': 3,
'retry_delay': timedelta(minutes=10),
}
os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=postgres&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=True&" \
"sql_proxy_use_tcp=True".format(**postgres_kwargs)
connection_names = [
"proxy_postgres_tcp"
]
tasks = []
with models.DAG(
dag_id='example_gcp_sql_query',
default_args=default_args,
schedule_interval=None
) as dag:
prev_task = None
for connection_name in connection_names:
task = CloudSqlQueryOperator(
gcp_cloudsql_conn_id=connection_name,
task_id="example_gcp_sql_task_" + connection_name,
sql=rule_query
)
tasks.append(task)
if prev_task:
prev_task >> task
prev_task = task
解决方案
推荐阅读
- c++ - 如何用 C++ 编写映射程序
- c# - 使用 2 个嵌套形式,但内部形式未在 asp.net 中显示
- blockchain - 如何在 Solana 上与其他人的公共程序互动?
- c# - 单击按钮后禁用使用 CSSclass 的 asp.net C# gridview 样式
- if-statement - 将数据从多列转换为行并保留“标签”
- javascript - 在 Angular 11 中获取 http://localhost:4200/account/runtime.js net::ERR_ABORTED 404(未找到)
- python - 尝试使用开源库为我的数据集预测性别,但仅在预测性别中显示男性
- reactjs - react.js 重新渲染组件仅在为 state 赋予另一个值时才起作用,与以前不同
- reactjs - React Financial Chart 未显示图表并抛出错误 - 在严格模式树中检测到旧版上下文 API
- docker - Docker 容器无法使用容器名称访问另一个容器