python - 如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?
问题描述
我在使用 DataflowRunner 使我的 Apache Beam 管道在 Cloud Dataflow 上工作时遇到问题。
管道的第一步是连接到托管在 VM 上的外部 Postgresql 服务器,该服务器只能通过 SSH 外部访问,端口 22,并提取一些数据。我无法更改这些防火墙规则,因此我只能通过SSH 隧道(即端口转发)连接到数据库服务器。
在我的代码中,我使用了 python 库 sshtunnel。当使用DirectRunner从我的开发计算机启动管道时,它可以完美运行:
from sshtunnel import open_tunnel
with open_tunnel(
(user_options.ssh_tunnel_host, user_options.ssh_tunnel_port),
ssh_username=user_options.ssh_tunnel_user,
ssh_password=user_options.ssh_tunnel_password,
remote_bind_address=(user_options.dbhost, user_options.dbport)
) as tunnel:
with beam.Pipeline(options=pipeline_options) as p:
(p | "Read data" >> ReadFromSQL(
host=tunnel.local_bind_host,
port=tunnel.local_bind_port,
username=user_options.dbusername,
password=user_options.dbpassword,
database=user_options.dbname,
wrapper=PostgresWrapper,
query=select_query
)
| "Format CSV" >> DictToCSV(headers)
| "Write CSV" >> WriteToText(user_options.export_location)
)
在非默认 VPC 中使用DataflowRunner启动的相同代码,其中所有入口都被拒绝但没有出口限制,并且配置了 CloudNAT,但失败并显示以下消息:
psycopg2.OperationalError:无法连接到服务器:连接被拒绝服务器是否在主机“0.0.0.0”上运行并接受端口 41697 上的 TCP/IP 连接?[运行“读取数据/读取”时]
所以,很明显我的隧道出了点问题,但我无法确定到底是什么。我开始怀疑是否可以通过 CloudNAT 直接设置 SSH 隧道,直到我发现这篇博文:https ://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-用例模式第 1 部分说明:
Cloud Dataflow 的核心优势在于您可以调用外部服务来丰富数据。例如,您可以调用微服务来获取元素的附加数据。在 DoFn 中,调用服务(通常通过 HTTP 完成)。只要您在项目/网络中设置的防火墙规则允许,您就可以完全控制您选择的任何类型的连接。
所以应该可以建立这个隧道!我不想放弃,但我不知道下一步该尝试什么。任何想法 ?
谢谢阅读
解决方案
问题解决了 !我不敢相信我已经花了整整两天的时间……我完全看错了方向。
问题不在于某些 Dataflow 或 GCP 网络配置,据我所知......
您可以完全控制您选择的任何类型的连接,只要您在项目/网络中设置的防火墙规则允许它
是真的。
问题当然出在我的代码中:只有在分布式环境中才发现问题。我犯了从主管道处理器而不是工人打开隧道的错误。所以 SSH 隧道启动了,但不是在工作人员和目标服务器之间,而是在主管道和目标服务器之间!
为了解决这个问题,我不得不更改我的请求 DoFn 以使用隧道包装查询执行:
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
如您所见,我不得不重写一些 pysql_beam 库。
最后,每个工作人员为每个请求打开自己的隧道。可能可以优化此行为,但这足以满足我的需求。
推荐阅读
- jms - MQException:MQJE001:尝试从队列中检索消息时完成代码“2”,原因“2033”
- function - POWERSHELL - 检查 samaccountname 是否存在,如果是,则使用不同的方法
- postgresql - 当原始 SQL 工作时,TypeORM 查询生成器返回空数组
- linux - 为什么我不能在 bash 中杀死一个进程?
- python-3.x - Separating elements returned by xpath with commas
- c++ - C++ 从元组中获取值
...> - c# - 如何将 IFormfile XML 数据转换为 .Net Core 中的列表
- java - 尝试从 word doc 获取特定数据时获取 ArrayIndexOutofBoundException
- django - DRF如何通过外键公司过滤用户和产品?
- python - flask restful:如何将 python 函数作为参数传递来触发 GET 请求?