首页 > 解决方案 > 以编程方式在 dockerized apache 气流 python 操作符内创建 SSH 隧道

问题描述

我的程序在运行 apache 气流的 docker 容器内时无法创建 SSH 隧道。只有在我的本地机器上运行该功能才能正常工作。我有一个服务器列表,用于创建隧道、查询数据库和关闭连接。通常,我会这样做:

for server in servers:
    server_conn = sshtunnel.SSHTunnelForwarder(
        server,
        ssh_username=ssh_user,
        ssh_password=ssh_password,
        remote_bind_address=(localhost, db_port),
        local_bind_address=(localhost, localport)
    )

这按预期工作,我可以从那里做任何我需要的事情。但是,在 Docker 中,它不起作用。我意识到 docker 运行并绑定到一个端口,实际上并不独立于主机系统,所以我曾经network_mode="host"帮助缓解这个问题。但是,这不起作用,因为我的容器失去了相互通信的能力。这是我的 docker-compose 文件

    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow
            - PGDATA=/var/lib/postgresql/data/pgdata
        volumes:
            - ~/.whale/pgdata:/var/lib/postgresql/data/pgdata
            - ./dags/docker/sql/create.sql:/docker-entrypoint-initdb.d/init.sql
        ports:
          - "5432:5432"

    webserver:
        image: hawk
        build:
            context: .
            dockerfile: ./dags/docker/Dockerfile-airflow
        restart: always
        depends_on:
            - postgres
            # - redis
        environment:
            - LOAD_EX=n
            - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
            - EXECUTOR=Local
        volumes:
            - ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        ports:
            - "8080:8080"
            - "52023:22"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3

我还按照此处docker exec的说明进行操作,达到了可以进入容器并手动键入上述 python 片段并获得工作连接的地步。此外,我在这里阅读了气流文档,其中涵盖了 SSH 连接运算符,但那些仅支持 bash 命令,我需要运行我的 python 函数。我真的很困惑为什么 python 代码在exec-ed 进入系统时会起作用,但当我通过气流 DAG 运行它时却不会。目前,我无法手动放入所有连接,因为一旦部署此系统,将有 > 100 个连接。任何帮助将不胜感激。如果需要更多深度,请告诉我。

标签: pythondockerdocker-composeairflowssh-tunnel

解决方案


我在打开隧道并尝试在单独的任务中连接到数据库时遇到了同样的问题,但是通过在同一个任务中执行这两个任务来让它工作(Airflow 在任务运行之间不会保持状态):

def select_from_tunnel_db():
    # Open SSH tunnel
    ssh_hook = SSHHook(ssh_conn_id='bastion-ssh-conn', keepalive_interval=60)
    tunnel = ssh_hook.get_tunnel(5432, remote_host='<db_host>', local_port=5432)
    tunnel.start()

    # Connect to DB and run query
    pg_hook = PostgresHook(
        postgres_conn_id='remote-db-conn',  # NOTE: host='localhost'
        schema='db_schema'
    )
    pg_cursor = pg_hook.get_conn().cursor()
    pg_cursor.execute('SELECT * FROM table;')
    select_val = pg_cursor.fetchall()

    return select_val


python_operator = PythonOperator(
    task_id='test_tunnel_conn',
    python_callable=select_from_tunnel_db,
    dag=dag
)

这会将端口 5432 上的流量从本地机器转发到远程数据库主机上的同一端口。SSHHook 需要与您将通过隧道通过的端点建立有效的 ssh 连接,而 PostgresHook 需要与端口 5432 上的“localhost”建立 postgres 连接。


推荐阅读