python - 如何确定 Airflow PostgresOperator 中身份验证失败的原因?
问题描述
我是气流新手。作为第一次学习练习,我创建了一个最小的 DAG,它每小时将一个值写入 postgres 数据库表。但是我无法建立与 postgres 数据库的连接。
请注意,我不是在谈论创建 postgres 后端数据库以使用本问题中描述的本地执行器。那是一个不同的话题(至少我理解它的方式。)
我创建了一个最小的示例,其中:
- 创建 Linux 用户、postgresql 角色和数据库都同名
minimal_db
- 创建一个由一个组成的 DAG
PostgresOperator
- 在 Airflow 网络服务器的管理选项卡中创建连接
1. 创建 Linux 用户、postgres 角色和数据库
对于这部分,我依赖于这篇文章。
我登录了 postgres 用户并创建了一个名为minimal_db
I created a database with same name 的角色。接下来我还创建了一个同名的 Linux 超级用户。这最后一步似乎没有必要,因为我认为我不需要身份验证。但是在我关注的文章中提到了它:
要使用基于身份的身份验证登录,您需要一个与您的 Postgres 角色和数据库同名的 Linux 用户。
postgres@ws:~$ createuser --interactive
postgres@ws:~$ createdb minimal_db
gontcharovd@ws:~$ sudo adduser minimal_db
这是数据库的连接信息:
minimal_db=# \conninfo
You are connected to database "minimal_db" as user "minimal_db" via socket in "/var/run/postgresql" at port "5432".
我不知道插座是什么/var/run/postgresql
意思。
我创建了一个名为my_table
并插入一个值的表:
minimal_db=# CREATE TABLE my_table (my_value INT NOT NULL);
minimal_db=# INSERT INTO my_table VALUES (123);
我检查了这个值是否存在于my_table
:
minimal_db=# SELECT * FROM my_table;
my_value
----------
123
(1 row)
postgres 服务器建议它接受连接:
gontcharovd@ws:~$ pg_isready -h localhost -p 5432
localhost:5432 - accepting connections
2. 最小的 Airflow DAG
我使用的 DAG 绝对是最小的:它每小时将值 123 写入my_table
in minimal_db
。
import airflow
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
dag = DAG(
'minimal_example',
start_date=airflow.utils.dates.days_ago(1),
schedule_interval='@hourly'
)
write_to_postgres = PostgresOperator(
task_id='write_to_postgres',
postgres_conn_id='minimal_db_id',
sql='INSERT INTO my_table VALUES (123);',
dag=dag
)
write_to_postgres
3. 在 Admin 选项卡中创建的 Airflow 连接
这是postgres_conn_id
我在 DAG 中使用的字段:
Conn Id: minimal_db_id
Con Type: Postgres
Host: localhost
Schema: minimal_db
Login: minimal_db
Password: [same password as the minimal_db role]
Port: 5432
4. write_to_postgres 任务出错
即使我确定 postgresql 角色的密码与连接minimal_db
中的密码匹配,我仍不断收到密码身份验证失败。minimal_db_id
[2020-06-05 20:42:51,845] {taskinstance.py:900} INFO - Executing <Task(PostgresOperator): write_to_postgres> on 2020-06-03T00:00:00+00:00
[2020-06-05 20:42:51,847] {standard_task_runner.py:53} INFO - Started process 59141 to run task
[2020-06-05 20:42:51,875] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: minimal_example.write_to_postgres 2020-06-03T00:00:00+00:00 [running]> ws
[2020-06-05 20:42:51,882] {postgres_operator.py:62} INFO - Executing: INSERT INTO my_table VALUES (123);
[2020-06-05 20:42:51,884] {logging_mixin.py:112} INFO - [2020-06-05 20:42:51,884] {base_hook.py:87} INFO - Using connection to: id: minimal_db_id. Host: localhost, Port: 5432, Schema: minimal_db, Login: minimal_db, Password: XXXXXXXX, extra: None
[2020-06-05 20:42:51,892] {taskinstance.py:1145} ERROR - FATAL: password authentication failed for user "minimal_db"
FATAL: password authentication failed for user "minimal_db"
Traceback (most recent call last):
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/operators/postgres_operator.py", line 65, in execute
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/dbapi_hook.py", line 162, in run
with closing(self.get_conn()) as conn:
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/postgres_hook.py", line 93, in get_conn
self.conn = psycopg2.connect(**conn_args)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL: password authentication failed for user "minimal_db"
FATAL: password authentication failed for user "minimal_db"
[2020-06-05 20:42:51,894] {taskinstance.py:1189} INFO - Marking task as FAILED.dag_id=minimal_example, task_id=write_to_postgres, execution_date=20200603T000000, start_date=20200605T184251, end_date=20200605T184251
[2020-06-05 20:43:01,840] {logging_mixin.py:112} INFO - [2020-06-05 20:43:01,840] {local_task_job.py:103} INFO - Task exited with return code 1
我试图将 minimum_db_id 定义中的 Host 字段更改为/var/run/postgresql
. 这会导致对等身份验证失败,而不是密码身份验证失败:
*** Reading local file: /home/gontcharovd/airflow/logs/minimal_example/write_to_postgres/2020-06-04T18:00:00+00:00/2.log
[2020-06-05 21:25:36,787] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [queued]>
[2020-06-05 21:25:36,793] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [queued]>
[2020-06-05 21:25:36,793] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-06-05 21:25:36,793] {taskinstance.py:880} INFO - Starting attempt 2 of 2
[2020-06-05 21:25:36,793] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-06-05 21:25:36,798] {taskinstance.py:900} INFO - Executing <Task(PostgresOperator): write_to_postgres> on 2020-06-04T18:00:00+00:00
[2020-06-05 21:25:36,800] {standard_task_runner.py:53} INFO - Started process 64696 to run task
[2020-06-05 21:25:36,836] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [running]> ws
[2020-06-05 21:25:36,843] {postgres_operator.py:62} INFO - Executing: INSERT INTO my_table VALUES (123);
[2020-06-05 21:25:36,846] {logging_mixin.py:112} INFO - [2020-06-05 21:25:36,846] {base_hook.py:87} INFO - Using connection to: id: minimal_db_id. Host: /var/run/postgresql/, Port: 5432, Schema: minimal_db, Login: minimal_db, Password: XXXXXXXX, extra: None
[2020-06-05 21:25:36,848] {taskinstance.py:1145} ERROR - FATAL: Peer authentication failed for user "minimal_db"
Traceback (most recent call last):
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/operators/postgres_operator.py", line 65, in execute
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/dbapi_hook.py", line 162, in run
with closing(self.get_conn()) as conn:
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/postgres_hook.py", line 93, in get_conn
self.conn = psycopg2.connect(**conn_args)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL: Peer authentication failed for user "minimal_db"
[2020-06-05 21:25:36,850] {taskinstance.py:1189} INFO - Marking task as FAILED.dag_id=minimal_example, task_id=write_to_postgres, execution_date=20200604T180000, start_date=20200605T192536, end_date=20200605T192536
[2020-06-05 21:25:46,786] {logging_mixin.py:112} INFO - [2020-06-05 21:25:46,785] {local_task_job.py:103} INFO - Task exited with return code 1
我不知道我还能尝试什么。
解决方案
我找到了问题和解决方案。
我的错误是我将 Linux 用户的密码minimal_db
与 postgres 角色混淆了minimal_db
。我删除了 postgres 角色并使用显式密码重新创建了它:
CREATE USER minimal_db WITH PASSWORD '123';
我在气流网络服务器的管理选项卡的连接字段中指定了这个密码。现在这些值已正确写入数据库。
推荐阅读
- shell - 如何在 shell 脚本中使用二进制数?
- python - 将 argparse 参数分组到(子)字典中
- javascript - 循环中的窗口,并在事件发生时将唯一参数传递给每个窗口。打开加载
- css - CSS - 动态改变字体大小以适应容器的高度
- compilation - 在 Gatling 中运行录制的模拟时,出现错误
- android - ionic 4 菜单不适用于 android 移动应用程序,但适用于桌面
- flowtype - 流动。如何根据函数参数指定联合类型
- java - CloseableHttpAsyncClient 不发出http请求
- angular - 如何以角度制作更新表格?
- c++ - 如何获得两个 std::map 的公共键?