首页 > 解决方案 > 如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?

问题描述

我在使用 DataflowRunner 使我的 Apache Beam 管道在 Cloud Dataflow 上工作时遇到问题。

管道的第一步是连接到托管在 VM 上的外部 Postgresql 服务器,该服务器只能通过 SSH 外部访问,端口 22,并提取一些数据。我无法更改这些防火墙规则,因此我只能通过SSH 隧道(即端口转发)连接到数据库服务器。

在我的代码中,我使用了 python 库 sshtunnel。当使用DirectRunner从我的开发计算机启动管道时,它可以完美运行:

from sshtunnel import open_tunnel

with open_tunnel(
        (user_options.ssh_tunnel_host, user_options.ssh_tunnel_port),
        ssh_username=user_options.ssh_tunnel_user,
        ssh_password=user_options.ssh_tunnel_password,
        remote_bind_address=(user_options.dbhost, user_options.dbport)
    ) as tunnel:
        with beam.Pipeline(options=pipeline_options) as p:
            (p | "Read data" >> ReadFromSQL(
                host=tunnel.local_bind_host,
                port=tunnel.local_bind_port,
                username=user_options.dbusername,
                password=user_options.dbpassword,
                database=user_options.dbname,
                wrapper=PostgresWrapper,
                query=select_query
            )
                | "Format CSV" >> DictToCSV(headers)
                | "Write CSV" >> WriteToText(user_options.export_location)
            )

在非默认 VPC 中使用DataflowRunner启动的相同代码,其中所有入口都被拒绝但没有出口限制,并且配置了 CloudNAT,但失败并显示以下消息:

psycopg2.OperationalError:无法连接到服务器:连接被拒绝服务器是否在主机“0.0.0.0”上运行并接受端口 41697 上的 TCP/IP 连接?[运行“读取数据/读取”时]

所以,很明显我的隧道出了点问题,但我无法确定到底是什么。我开始怀疑是否可以通过 CloudNAT 直接设置 SSH 隧道,直到我发现这篇博文:https ://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-用例模式第 1 部分说明:

Cloud Dataflow 的核心优势在于您可以调用外部服务来丰富数据。例如,您可以调用微服务来获取元素的附加数据。在 DoFn 中,调用服务(通常通过 HTTP 完成)。只要您在项目/网络中设置的防火墙规则允许,您就可以完全控制您选择的任何类型的连接。

所以应该可以建立这个隧道!我不想放弃,但我不知道下一步该尝试什么。任何想法 ?

谢谢阅读

标签: pythonpostgresqlapache-beamdataflowssh-tunnel

解决方案


问题解决了 !我不敢相信我已经花了整整两天的时间……我完全看错了方向。

问题不在于某些 Dataflow 或 GCP 网络配置,据我所知......

您可以完全控制您选择的任何类型的连接,只要您在项目/网络中设置的防火墙规则允许它

是真的。

问题当然出在我的代码中:只有在分布式环境中才发现问题。我犯了从主管道处理器而不是工人打开隧道的错误。所以 SSH 隧道启动了,但不是在工作人员和目标服务器之间,而是在主管道和目标服务器之间!

为了解决这个问题,我不得不更改我的请求 DoFn 以使用隧道包装查询执行:

class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

如您所见,我不得不重写一些 pysql_beam 库。

最后,每个工作人员为每个请求打开自己的隧道。可能可以优化此行为,但这足以满足我的需求。


推荐阅读