python - Docker-compose with Airflow - MS SQL Server(连接失败)
问题描述
我无法使用 docker-compose 在 Airflow 中连接 SQL Server。我想将数据从 SQL Server 直接传输到 Cloud Storage,然后将数据发送到 Big Query。
如何解决这个问题?
import json
from datetime import timedelta, datetime
from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
default_args = {
'owner': 'Test Data',
'depends_on_past': True,
'start_date': datetime(2019, 5, 29),
'end_date': datetime(2019, 5, 30),
'email': ['email@clientx.com.br'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Set Schedule: Run pipeline once a day.
# Use cron to define exact time. Eg. 8:15am would be "15 08 * * *"
schedule_interval = "* * * * *"
# Define DAG: Set ID and assign default args and schedule interval
dag = DAG(
'bigquery_github_trends',
default_args=default_args,
schedule_interval=schedule_interval
)
extract = MySqlToGoogleCloudStorageOperator(
task_id='chama_extract',
mysql_conn_id='mysql_hml',
google_cloud_storage_conn_id='my_gcp_conn',
sql="""SELECT * FROM test""",
bucket='my_bucket',
filename='test/test{}.json',
schema_filename='schemas/test.json',
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id='chama_load',
bigquery_conn_id='my_gcp_conn',
google_cloud_storage_conn_id='my_gcp_conn',
bucket='my_bucket',
destination_project_dataset_table="tst.teste123",
source_objects=['test/test0.json'],
schema_object='schemas/test.json',
source_format='NEWLINE_DELIMITED_JSON',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
dag=dag)
# Setting up Dependencies
load.set_upstream(extract)
Docker-compose.yml
version: '3'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
webserver:
image: puckel/docker-airflow:1.10.1
build:
context: https://github.com/puckel/docker-airflow.git#1.10.1
dockerfile: Dockerfile
args:
AIRFLOW_DEPS: gcp_api,s3
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
- FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
volumes:
- ./examples/intro-example/dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
docker-compose-gcloud.yml
version: '3'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
webserver:
image: puckel/docker-airflow:1.10.1
build:
context: https://github.com/puckel/docker-airflow.git#1.10.1
dockerfile: Dockerfile
args:
AIRFLOW_DEPS: gcp_api,s3
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
- FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
volumes:
- ./examples/gcloud-example/dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
并在 docker 中执行命令:
docker-compose -f docker-compose-gcloud.yml up --abort-on-container-exit
气流中的错误消息:
[2019-05-29 07:00:37,938] {{logging_mixin.py:95}} 信息 - [2019-05-29 07:00:37,937] {{base_hook.py:83}} 信息 - 使用连接到: 10.0.0.1 [2019-05-29 07:00:58,974] {{models.py:1760}} 错误 - (2003, 'Can\'t connect to MySQL server on 10.0.0.1 (111 "Connection denied")' )
回溯(最近一次调用):
文件“/usr/local/lib/python3.6/site-packages/airflow/models.py”,第 1659 行,在 _run_raw_task
结果 = task_copy.execute(context=context)
文件中/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py”,第 105 行,在执行
cursor = self._query_mysql()
文件“/usr/local/lib/python3.6 /site-packages/airflow/contrib/operators/mysql_to_gcs.py”,第 127 行,在 _query_mysql
conn = mysql.get_conn()
文件“/usr/local/lib/python3.6/site-packages/airflow/hooks/mysql_hook.py”,第 103 行,在 get_conn
conn = MySQLdb.connect(**conn_config)
文件“/usr/local/lib/ python3.6/site-packages/MySQLdb/ init .py”,第 84 行,在 Connect
返回 Connection(*args, **kwargs)
文件“/usr/local/lib/python3.6/site-packages/MySQLdb/connections .py”,第 164 行,在init
super(Connection, self) 中。init (*args, **kwargs2)
MySQLdb._exceptions.OperationalError: (2003, 'Can\'t connect to MySQL server on 10.0.0.1 (111 "Connection denied")')
[2019-05-29 07:00: 58,988] {{models.py:1789}} 信息 - 所有重试失败;将任务标记为 FAILED
[2019-05-29 07:00:58,992] {{logging_mixin.py:95}} 信息 - [2019-05-29 07:00:58,991] {{configuration.py:255}} 警告 - 部分/键 [ smtp/smtp_user] 在配置中找不到
[2019-05-29 07:00:58,998] {{models.py:1796}} 错误 - [Errno 99] 无法分配请求的地址
Traceback(最近一次调用最后一次):
文件“/ usr/local/lib/python3.6/site-packages/airflow/models.py”,第 1659 行,在 _run_raw_task
结果 = task_copy.execute(context=context)
文件“/usr/local/lib/python3.6/site -packages/airflow/contrib/operators/mysql_to_gcs.py”,第 105 行,在执行
cursor = self._query_mysql()
文件“/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs .py”,第 127 行,在 _query_mysql 中
conn = mysql.get_conn()
文件“/usr/local/lib/python3.6/site-packages/airflow/hooks/mysql_hook.py”,第 103 行,在 get_conn
conn = MySQLdb.connect(**conn_config)
文件中“ /usr/local/lib/python3.6/site-packages/MySQLdb/init .py ”,第 84 行,在 Connect
返回 Connection(*args, **kwargs)
文件“/usr/local/lib/python3.6/ site-packages/MySQLdb/connections.py”,第 164 行,在init
super(Connection, self) 中。init (*args, **kwargs2)
MySQLdb._exceptions.OperationalError: (2003, 'Can\'t connect to MySQL server on 10.0.0.1 (111 "Connection denied")')
解决方案
从错误来看,对我来说关键部分似乎是“get_conn”部分。这表明当气流试图建立与数据库的连接时,它失败了。这意味着您的连接没有指定(看起来可能是)或者它的某些部分不正确。
您应该检查密码、服务器地址和端口是否正确。这些应该在您的airflow.cfg中,作为环境变量,或者在网络服务器(管理面板)中设置
推荐阅读
- php - 如何将 Contact Form 7 字段添加到我的自定义 HTML 代码中,如下面的屏幕截图所示?
- google-apps-script - 是否可以将问题和多项选择选项从 Google 表单导出到 Google 表格?
- node.js - mocha 测试无法识别 *.d.ts 声明
- python - 我注意到有时 run_coroutine_threadsafe 确实跳过了任务
- c# - Ngnix 不匹配位置地址
- mysql - MySQL - 您的 SQL 语法有错误;检查与您的 MariaDB 服务器版本相对应的手册以获取正确的语法 - phpMyAdmin
- javascript - 如何将基于数组的表单与 FormData 对象一起使用?
- css - 更改禁用材质填充输入的背景
- c# - 首先从文本文件 asp.net Mvc 代码填充数据库
- wordpress - 仅为顶部添加控制部分 - Elementor