首页 > 解决方案 > 气流 2.0.1 中的 MysqlOperator 因“ssl 连接错误”而失败

问题描述

我是气流新手,我正在尝试在气流 2.0.1 中使用 MysqlOperator 测试 Mysql 连接。但是,我收到有关ssl 连接错误的错误。我试图添加额外的参数来禁用 ssl 模式,但我仍然遇到同样的错误。

这是我的代码,(我试图在代码中传递ssl param = disable),但它不起作用。

from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

from airflow.utils.dates import days_ago, timedelta

default_args = {
    'owner' : 'airflow',
    'depend_on_past' : False,
    'start_date' : days_ago(2),
    'retries' : 1,
    'retry_delay' : timedelta(minutes=1)
}

with DAG(
    'mysqlConnTest',
    default_args=default_args,
    schedule_interval='@once',
    catchup=False) as dag:

    
    start_date = DummyOperator(task_id = "start_task")
   
    
    # [START howto_operator_mysql]
    select_table_mysql_task = MySqlOperator(
        task_id='select_table_mysql', mysql_conn_id='mysql', sql="SELECT * FROM country;"autocommit=True, parameters= {'ssl_mode': 'DISABLED'}
    )



    start_date >> select_table_mysql_task

这是错误

*** Reading local file: /opt/airflow/logs/mysqlHookConnTest/select_table_mysql/2021-04-14T12:46:42.221662+00:00/2.log
[2021-04-14 12:47:46,791] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: mysqlHookConnTest.select_table_mysql 2021-04-14T12:46:42.221662+00:00 [queued]>
[2021-04-14 12:47:47,007] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: mysqlHookConnTest.select_table_mysql 2021-04-14T12:46:42.221662+00:00 [queued]>
[2021-04-14 12:47:47,047] {taskinstance.py:1042} INFO - 
--------------------------------------------------------------------------------
[2021-04-14 12:47:47,054] {taskinstance.py:1043} INFO - Starting attempt 2 of 2
[2021-04-14 12:47:47,074] {taskinstance.py:1044} INFO - 
--------------------------------------------------------------------------------
[2021-04-14 12:47:47,331] {taskinstance.py:1063} INFO - Executing <Task(MySqlOperator): select_table_mysql> on 2021-04-14T12:46:42.221662+00:00
[2021-04-14 12:47:47,377] {standard_task_runner.py:52} INFO - Started process 66 to run task
[2021-04-14 12:47:47,402] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'mysqlHookConnTest', 'select_table_mysql', '2021-04-14T12:46:42.221662+00:00', '--job-id', '142', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/MySqlHookConnTest.py', '--cfg-path', '/tmp/tmppujnrey3', '--error-file', '/tmp/tmpjl_g_p3t']
[2021-04-14 12:47:47,413] {standard_task_runner.py:77} INFO - Job 142: Subtask select_table_mysql
[2021-04-14 12:47:47,556] {logging_mixin.py:104} INFO - Running <TaskInstance: mysqlHookConnTest.select_table_mysql 2021-04-14T12:46:42.221662+00:00 [running]> on host ea95b9685a31
[2021-04-14 12:47:47,672] {taskinstance.py:1257} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=mysqlHookConnTest
AIRFLOW_CTX_TASK_ID=select_table_mysql
AIRFLOW_CTX_EXECUTION_DATE=2021-04-14T12:46:42.221662+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-04-14T12:46:42.221662+00:00
[2021-04-14 12:47:47,687] {mysql.py:72} INFO - Executing: SELECT idPais, Nombre, codigo, paisPlataforma, create_date, update_date FROM ob_cpanel.cpanel_pais;
[2021-04-14 12:47:47,710] {base.py:74} INFO - Using connection to: id: mysql. Host: sys-sql-pre-01.oneboxtickets.net, Port: 3306, Schema: , Login: lectura, Password: None, extra: None
[2021-04-14 12:47:48,134] {taskinstance.py:1455} ERROR - (2006, 'SSL connection error: error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol')
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/mysql/operators/mysql.py", line 74, in execute
    hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/hooks/dbapi.py", line 173, in run
    with closing(self.get_conn()) as conn:
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/mysql/hooks/mysql.py", line 144, in get_conn
    return MySQLdb.connect(**conn_config)
  File "/home/airflow/.local/lib/python3.6/site-packages/MySQLdb/__init__.py", line 85, in Connect
    return Connection(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/MySQLdb/connections.py", line 208, in __init__
    super(Connection, self).__init__(*args, **kwargs2)
_mysql_exceptions.OperationalError: (2006, 'SSL connection error: error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol')
[2021-04-14 12:47:48,143] {taskinstance.py:1503} INFO - Marking task as FAILED. dag_id=mysqlHookConnTest, task_id=select_table_mysql, execution_date=20210414T124642, start_date=20210414T124746, end_date=20210414T124748
[2021-04-14 12:47:48,243] {local_task_job.py:146} INFO - Task exited with return code 1

我们试图从 dag 代码中删除最后两个参数,并添加了额外的字段(conn-airflow UI)。添加这个json

{“ssl”:假}

并且问题出现另一个类似的错误

/opt/airflow/logs/mysqlOperatorConnTest/select_table_mysql/2021-04-15T11:26:50.578333+00:00/2.log
*** Fetching from: http://airflow-worker-0.airflow-worker.airflow.svc.cluster.local:8793/log/mysqlOperatorConnTest/select_table_mysql/2021-04-15T11:26:50.578333+00:00/2.log
[2021-04-15 11:27:54,471] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: mysqlOperatorConnTest.select_table_mysql 2021-04-15T11:26:50.578333+00:00 [queued]>
[2021-04-15 11:27:54,497] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: mysqlOperatorConnTest.select_table_mysql 2021-04-15T11:26:50.578333+00:00 [queued]>
[2021-04-15 11:27:54,497] {taskinstance.py:1042} INFO - 
--------------------------------------------------------------------------------
[2021-04-15 11:27:54,497] {taskinstance.py:1043} INFO - Starting attempt 2 of 2
[2021-04-15 11:27:54,497] {taskinstance.py:1044} INFO - 
--------------------------------------------------------------------------------
[2021-04-15 11:27:54,507] {taskinstance.py:1063} INFO - Executing <Task(MySqlOperator): select_table_mysql> on 2021-04-15T11:26:50.578333+00:00
[2021-04-15 11:27:54,510] {standard_task_runner.py:52} INFO - Started process 115 to run task
[2021-04-15 11:27:54,514] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'mysqlOperatorConnTest', 'select_table_mysql', '2021-04-15T11:26:50.578333+00:00', '--job-id', '68', '--pool', 'default_pool', '--raw', '--subdir', '/opt/airflow/dags/repo/MySqlOperatorConnTest.py', '--cfg-path', '/tmp/tmpy7bv58_z', '--error-file', '/tmp/tmpaoe808of']
[2021-04-15 11:27:54,514] {standard_task_runner.py:77} INFO - Job 68: Subtask select_table_mysql
[2021-04-15 11:27:54,644] {logging_mixin.py:104} INFO - Running <TaskInstance: mysqlOperatorConnTest.select_table_mysql 2021-04-15T11:26:50.578333+00:00 [running]> on host airflow-worker-0.airflow-worker.airflow.svc.cluster.local
[2021-04-15 11:27:54,707] {logging_mixin.py:104} WARNING - /opt/python/site-packages/sqlalchemy/sql/coercions.py:518 SAWarning: Coercing Subquery object into a select() for use in IN(); please pass a select() construct explicitly
[2021-04-15 11:27:54,725] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=mysqlOperatorConnTest
AIRFLOW_CTX_TASK_ID=select_table_mysql
AIRFLOW_CTX_EXECUTION_DATE=2021-04-15T11:26:50.578333+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-04-15T11:26:50.578333+00:00
[2021-04-15 11:27:54,726] {mysql.py:72} INFO - Executing: SELECT idPais, Nombre, codigo, paisPlataforma, create_date, update_date FROM ob_cpanel.cpanel_pais;
[2021-04-15 11:27:54,744] {connection.py:337} ERROR - Expecting value: line 2 column 9 (char 11)
Traceback (most recent call last):
  File "/opt/python/site-packages/airflow/models/connection.py", line 335, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 2 column 9 (char 11)
[2021-04-15 11:27:54,744] {connection.py:338} ERROR - Failed parsing the json for conn_id mysql
[2021-04-15 11:27:54,744] {base.py:65} INFO - Using connection to: id: mysql. Host: sys-sql-pre-01.oneboxtickets.net, Port: 3306, Schema: , Login: lectura, Password: XXXXXXXX, extra: None
[2021-04-15 11:27:54,745] {connection.py:337} ERROR - Expecting value: line 2 column 9 (char 11)
Traceback (most recent call last):
  File "/opt/python/site-packages/airflow/models/connection.py", line 335, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 2 column 9 (char 11)
[2021-04-15 11:27:54,745] {connection.py:338} ERROR - Failed parsing the json for conn_id mysql
[2021-04-15 11:27:54,745] {connection.py:337} ERROR - Expecting value: line 2 column 9 (char 11)
Traceback (most recent call last):
  File "/opt/python/site-packages/airflow/models/connection.py", line 335, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 2 column 9 (char 11)
[2021-04-15 11:27:54,745] {connection.py:338} ERROR - Failed parsing the json for conn_id mysql
[2021-04-15 11:27:54,746] {connection.py:337} ERROR - Expecting value: line 2 column 9 (char 11)
Traceback (most recent call last):
  File "/opt/python/site-packages/airflow/models/connection.py", line 335, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 2 column 9 (char 11)
[2021-04-15 11:27:54,746] {connection.py:338} ERROR - Failed parsing the json for conn_id mysql
[2021-04-15 11:27:54,746] {connection.py:337} ERROR - Expecting value: line 2 column 9 (char 11)
Traceback (most recent call last):
  File "/opt/python/site-packages/airflow/models/connection.py", line 335, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 2 column 9 (char 11)
[2021-04-15 11:27:54,746] {connection.py:338} ERROR - Failed parsing the json for conn_id mysql
[2021-04-15 11:27:54,746] {connection.py:337} ERROR - Expecting value: line 2 column 9 (char 11)
Traceback (most recent call last):
  File "/opt/python/site-packages/airflow/models/connection.py", line 335, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 2 column 9 (char 11)
[2021-04-15 11:27:54,747] {connection.py:338} ERROR - Failed parsing the json for conn_id mysql
[2021-04-15 11:27:54,747] {connection.py:337} ERROR - Expecting value: line 2 column 9 (char 11)
Traceback (most recent call last):
  File "/opt/python/site-packages/airflow/models/connection.py", line 335, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 2 column 9 (char 11)
[2021-04-15 11:27:54,747] {connection.py:338} ERROR - Failed parsing the json for conn_id mysql
[2021-04-15 11:27:54,747] {connection.py:337} ERROR - Expecting value: line 2 column 9 (char 11)
Traceback (most recent call last):
  File "/opt/python/site-packages/airflow/models/connection.py", line 335, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 2 column 9 (char 11)
[2021-04-15 11:27:54,747] {connection.py:338} ERROR - Failed parsing the json for conn_id mysql
[2021-04-15 11:27:54,787] {taskinstance.py:1455} ERROR - (2006, 'SSL connection error: error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol')
Traceback (most recent call last):
  File "/opt/python/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/opt/python/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/opt/python/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/mysql/operators/mysql.py", line 74, in execute
    hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
  File "/opt/python/site-packages/airflow/hooks/dbapi.py", line 173, in run
    with closing(self.get_conn()) as conn:
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/mysql/hooks/mysql.py", line 144, in get_conn
    return MySQLdb.connect(**conn_config)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/__init__.py", line 85, in Connect
    return Connection(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/connections.py", line 208, in __init__
    super(Connection, self).__init__(*args, **kwargs2)
_mysql_exceptions.OperationalError: (2006, 'SSL connection error: error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol')
[2021-04-15 11:27:54,788] {taskinstance.py:1496} INFO - Marking task as FAILED. dag_id=mysqlOperatorConnTest, task_id=select_table_mysql, execution_date=20210415T112650, start_date=20210415T112754, end_date=20210415T112754
[2021-04-15 11:27:54,845] {local_task_job.py:146} INFO - Task exited with return code 1

标签: airflowairflow-2.x

解决方案


我们解决了将 Mysql 客户端升级到 5.7 的问题。我们的服务器版本是 5.6,之前的客户端是 8,因为我使用的是 docker 镜像。所以我们将客户端降级为更接近服务器版本。


推荐阅读