python - 以编程方式在 dockerized apache 气流 python 操作符内创建 SSH 隧道
问题描述
我的程序在运行 apache 气流的 docker 容器内时无法创建 SSH 隧道。只有在我的本地机器上运行该功能才能正常工作。我有一个服务器列表,用于创建隧道、查询数据库和关闭连接。通常,我会这样做:
for server in servers:
server_conn = sshtunnel.SSHTunnelForwarder(
server,
ssh_username=ssh_user,
ssh_password=ssh_password,
remote_bind_address=(localhost, db_port),
local_bind_address=(localhost, localport)
)
这按预期工作,我可以从那里做任何我需要的事情。但是,在 Docker 中,它不起作用。我意识到 docker 运行并绑定到一个端口,实际上并不独立于主机系统,所以我曾经network_mode="host"
帮助缓解这个问题。但是,这不起作用,因为我的容器失去了相互通信的能力。这是我的 docker-compose 文件
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
- PGDATA=/var/lib/postgresql/data/pgdata
volumes:
- ~/.whale/pgdata:/var/lib/postgresql/data/pgdata
- ./dags/docker/sql/create.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
webserver:
image: hawk
build:
context: .
dockerfile: ./dags/docker/Dockerfile-airflow
restart: always
depends_on:
- postgres
# - redis
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Local
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
- "52023:22"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
我还按照此处docker exec
的说明进行操作,达到了可以进入容器并手动键入上述 python 片段并获得工作连接的地步。此外,我在这里阅读了气流文档,其中涵盖了 SSH 连接运算符,但那些仅支持 bash 命令,我需要运行我的 python 函数。我真的很困惑为什么 python 代码在exec
-ed 进入系统时会起作用,但当我通过气流 DAG 运行它时却不会。目前,我无法手动放入所有连接,因为一旦部署此系统,将有 > 100 个连接。任何帮助将不胜感激。如果需要更多深度,请告诉我。
解决方案
我在打开隧道并尝试在单独的任务中连接到数据库时遇到了同样的问题,但是通过在同一个任务中执行这两个任务来让它工作(Airflow 在任务运行之间不会保持状态):
def select_from_tunnel_db():
# Open SSH tunnel
ssh_hook = SSHHook(ssh_conn_id='bastion-ssh-conn', keepalive_interval=60)
tunnel = ssh_hook.get_tunnel(5432, remote_host='<db_host>', local_port=5432)
tunnel.start()
# Connect to DB and run query
pg_hook = PostgresHook(
postgres_conn_id='remote-db-conn', # NOTE: host='localhost'
schema='db_schema'
)
pg_cursor = pg_hook.get_conn().cursor()
pg_cursor.execute('SELECT * FROM table;')
select_val = pg_cursor.fetchall()
return select_val
python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
dag=dag
)
这会将端口 5432 上的流量从本地机器转发到远程数据库主机上的同一端口。SSHHook 需要与您将通过隧道通过的端点建立有效的 ssh 连接,而 PostgresHook 需要与端口 5432 上的“localhost”建立 postgres 连接。
推荐阅读
- flutter - 即使在默认的默认 Flutter Web 项目中也出现错误“FormatException:意外字符(在字符 1)”
- python - 将列添加到扩展的 `pandas.DataFrame` 类
- amazon-cloudformation - 将 Yaml cloudformation 模板转换为 Json
- r - 如何从 R 中的 plot() 中提取散点图的数据
- mysql - MYSQL 根据 CASE 语句选择列
- python - Python C 扩展模块中的“PYVERNODOTS”是什么意思?
- testing - TestCafe 的错误代码 E26 是什么意思?
- android - 是否可以在 android/ios 本机应用程序中运行 PWA?
- java - 如何从附加到活动的片段中显示向上按钮?
- node.js - 在浏览器中隐藏 div 元素,但使用 jsPDF 打印元素