首页 > 解决方案 > 如何确定 Airflow PostgresOperator 中身份验证失败的原因?

问题描述

我是气流新手。作为第一次学习练习,我创建了一个最小的 DAG,它每小时将一个值写入 postgres 数据库表。但是我无法建立与 postgres 数据库的连接。

请注意,我不是在谈论创建 postgres 后端数据库以使用本问题中描述的本地执行器。那是一个不同的话题(至少我理解它的方式。)

我创建了一个最小的示例,其中:

  1. 创建 Linux 用户、postgresql 角色和数据库都同名minimal_db
  2. 创建一个由一个组成的 DAGPostgresOperator
  3. 在 Airflow 网络服务器的管理选项卡中创建连接

1. 创建 Linux 用户、postgres 角色和数据库

对于这部分,我依赖于这篇文章

我登录了 postgres 用户并创建了一个名为minimal_dbI created a database with same name 的角色。接下来我还创建了一个同名的 Linux 超级用户。这最后一步似乎没有必要,因为我认为我不需要身份验证。但是在我关注的文章中提到了它:

要使用基于身份的身份验证登录,您需要一个与您的 Postgres 角色和数据库同名的 Linux 用户。

postgres@ws:~$ createuser --interactive
postgres@ws:~$ createdb minimal_db
gontcharovd@ws:~$ sudo adduser minimal_db

这是数据库的连接信息:

minimal_db=# \conninfo
You are connected to database "minimal_db" as user "minimal_db" via socket in "/var/run/postgresql" at port "5432".

我不知道插座是什么/var/run/postgresql意思。

我创建了一个名为my_table并插入一个值的表:

minimal_db=# CREATE TABLE my_table (my_value INT NOT NULL);
minimal_db=# INSERT INTO my_table VALUES (123);

我检查了这个值是否存在于my_table

minimal_db=# SELECT * FROM my_table;

my_value
----------
      123
(1 row)

postgres 服务器建议它接受连接:

gontcharovd@ws:~$ pg_isready -h localhost -p 5432
localhost:5432 - accepting connections

2. 最小的 Airflow DAG

我使用的 DAG 绝对是最小的:它每小时将值 123 写入my_tablein minimal_db

import airflow
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator

dag = DAG(
    'minimal_example',
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval='@hourly'
)

write_to_postgres = PostgresOperator(
    task_id='write_to_postgres',
    postgres_conn_id='minimal_db_id',
    sql='INSERT INTO my_table VALUES (123);',
    dag=dag
)

write_to_postgres

3. 在 Admin 选项卡中创建的 Airflow 连接

这是postgres_conn_id我在 DAG 中使用的字段:

Conn Id: minimal_db_id
Con Type: Postgres
Host: localhost
Schema: minimal_db
Login: minimal_db
Password: [same password as the minimal_db role]
Port: 5432

4. write_to_postgres 任务出错

即使我确定 postgresql 角色的密码与连接minimal_db中的密码匹配,我仍不断收到密码身份验证失败。minimal_db_id

[2020-06-05 20:42:51,845] {taskinstance.py:900} INFO - Executing <Task(PostgresOperator): write_to_postgres> on 2020-06-03T00:00:00+00:00
[2020-06-05 20:42:51,847] {standard_task_runner.py:53} INFO - Started process 59141 to run task
[2020-06-05 20:42:51,875] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: minimal_example.write_to_postgres 2020-06-03T00:00:00+00:00 [running]> ws
[2020-06-05 20:42:51,882] {postgres_operator.py:62} INFO - Executing: INSERT INTO my_table VALUES (123);
[2020-06-05 20:42:51,884] {logging_mixin.py:112} INFO - [2020-06-05 20:42:51,884] {base_hook.py:87} INFO - Using connection to: id: minimal_db_id. Host: localhost, Port: 5432, Schema: minimal_db, Login: minimal_db, Password: XXXXXXXX, extra: None
[2020-06-05 20:42:51,892] {taskinstance.py:1145} ERROR - FATAL:  password authentication failed for user "minimal_db"
FATAL:  password authentication failed for user "minimal_db"
Traceback (most recent call last):
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/operators/postgres_operator.py", line 65, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/dbapi_hook.py", line 162, in run
    with closing(self.get_conn()) as conn:
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/postgres_hook.py", line 93, in get_conn
    self.conn = psycopg2.connect(**conn_args)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL:  password authentication failed for user "minimal_db"
FATAL:  password authentication failed for user "minimal_db"

[2020-06-05 20:42:51,894] {taskinstance.py:1189} INFO - Marking task as FAILED.dag_id=minimal_example, task_id=write_to_postgres, execution_date=20200603T000000, start_date=20200605T184251, end_date=20200605T184251
[2020-06-05 20:43:01,840] {logging_mixin.py:112} INFO - [2020-06-05 20:43:01,840] {local_task_job.py:103} INFO - Task exited with return code 1

我试图将 minimum_db_id 定义中的 Host 字段更改为/var/run/postgresql. 这会导致对等身份验证失败,而不是密码身份验证失败:

*** Reading local file: /home/gontcharovd/airflow/logs/minimal_example/write_to_postgres/2020-06-04T18:00:00+00:00/2.log
[2020-06-05 21:25:36,787] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [queued]>
[2020-06-05 21:25:36,793] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [queued]>
[2020-06-05 21:25:36,793] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2020-06-05 21:25:36,793] {taskinstance.py:880} INFO - Starting attempt 2 of 2
[2020-06-05 21:25:36,793] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2020-06-05 21:25:36,798] {taskinstance.py:900} INFO - Executing <Task(PostgresOperator): write_to_postgres> on 2020-06-04T18:00:00+00:00
[2020-06-05 21:25:36,800] {standard_task_runner.py:53} INFO - Started process 64696 to run task
[2020-06-05 21:25:36,836] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [running]> ws
[2020-06-05 21:25:36,843] {postgres_operator.py:62} INFO - Executing: INSERT INTO my_table VALUES (123);
[2020-06-05 21:25:36,846] {logging_mixin.py:112} INFO - [2020-06-05 21:25:36,846] {base_hook.py:87} INFO - Using connection to: id: minimal_db_id. Host: /var/run/postgresql/, Port: 5432, Schema: minimal_db, Login: minimal_db, Password: XXXXXXXX, extra: None
[2020-06-05 21:25:36,848] {taskinstance.py:1145} ERROR - FATAL:  Peer authentication failed for user "minimal_db"
Traceback (most recent call last):
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/operators/postgres_operator.py", line 65, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/dbapi_hook.py", line 162, in run
    with closing(self.get_conn()) as conn:
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/postgres_hook.py", line 93, in get_conn
    self.conn = psycopg2.connect(**conn_args)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL:  Peer authentication failed for user "minimal_db"

[2020-06-05 21:25:36,850] {taskinstance.py:1189} INFO - Marking task as FAILED.dag_id=minimal_example, task_id=write_to_postgres, execution_date=20200604T180000, start_date=20200605T192536, end_date=20200605T192536
[2020-06-05 21:25:46,786] {logging_mixin.py:112} INFO - [2020-06-05 21:25:46,785] {local_task_job.py:103} INFO - Task exited with return code 1

我不知道我还能尝试什么。

标签: pythonpostgresqlairflow

解决方案


我找到了问题和解决方案。

我的错误是我将 Linux 用户的密码minimal_db与 postgres 角色混淆了minimal_db。我删除了 postgres 角色并使用显式密码重新创建了它:

CREATE USER minimal_db WITH PASSWORD '123'; 

我在气流网络服务器的管理选项卡的连接字段中指定了这个密码。现在这些值已正确写入数据库。


推荐阅读