postgresql - 无法使用托管在 kubernetes 中的 Pyspark 从 PSQL 读取
问题描述
我已经在 Kubernetes 中部署了 pyspark 3.0.1。
我在 jupyter notebook 中使用考拉来执行一些转换,我需要在 Azure Database for PostgreSQL 中进行写入和读取。
我可以使用以下代码从 pandas 中读取它:
from sqlalchemy import create_engine
import psycopg2
import pandas
uri = 'postgres+psycopg2://<postgreuser>:<postgrepassword>@<server>:5432/<database>'
engine_azure = create_engine(uri, echo=False)
df = pdf.read_sql_query(f"select * from public.<table>", con=engine_azure)
我想使用以下代码从 Pyspark 读取此表:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import databricks.koalas as ks
from s3fs import S3FileSystem
import datetime
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3,org.postgresql:postgresql:42.1.1 pyspark-shell pyspark-shell"
os.environ['PYSPARK_SUBMIT_ARGS2'] = "--packages org.postgresql:postgresql:42.1.1 pyspark-shell"
sparkClassPath = os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "<image>")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "2")
sparkConf.set("spark.driver.memory", "2000m")
sparkConf.set("spark.executor.memory", "2000m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "29414")
sparkConf.set("spark.driver.host", "<deployment>.svc.cluster.local")
sparkConf.set("spark.driver.extraClassPath", sparkClassPath)
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
df3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://<host>:5432/<database>") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "select * from public.<table>") \
.option("user", "<user>") \
.option("password", "<password>") \
.load()
但我收到此错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-5-a529178ed9a0> in <module>
1 url = 'jdbc:postgresql://psql-mcf-prod1.postgres.database.azure.com:5342/cpke-prod'
2 properties = {'user': 'adminmcfpsql@psql-mcf-prod1.postgres.database.azure.com', 'password': '4vb44B^V8w2D*q!eQZgl',"driver": "org.postgresql.Driver"}
----> 3 df3 = spark.read.jdbc(url=url, table='select * from public.userinput_write_offs where reversed_date is NULL', properties=properties)
/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
629 jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
630 return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
--> 631 return self._df(self._jreader.jdbc(url, table, jprop))
632
633
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o89.jdbc.
: org.postgresql.util.PSQLException: The connection attempt failed.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
at org.postgresql.Driver.makeConnection(Driver.java:450)
at org.postgresql.Driver.connect(Driver.java:252)
at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:312)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.postgresql.core.PGStream.<init>(PGStream.java:68)
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
... 27 more
解决方案
您的端口号不正确 - 它应该是 5432,而不是 5342。因此您的连接超时。如果换行
.option("url", "jdbc:postgresql://<host>:5342/<database>")
到
.option("url", "jdbc:postgresql://<host>:5432/<database>")
也许它会解决你的问题。
推荐阅读
- data-mining - 绘制与下面给出的 CART 分区对应的树
- android - ListView with Strings:无法让它更新按钮的文本
- sql - 如何将键值数组转换为 BigQuery / GoogleSQL 中的列?
- prisma - 在 Prisma 中执行此关系“事务”的更好方法
- session - 如何在与 user:pass auth 的会话中将代理添加到 Praw
- android - 在 Surface、SurfaceControl、WindowManager 和 OpenGL 的上下文中,布局如何在 Android 中呈现在屏幕上?
- c# - 将参数传递给 window.location.href 失败
- android - 谷歌登录抛出 com.google.android.gms.common.api.ApiException: 10
- java - 在同一场景中在移动和桌面浏览器/驱动程序之间切换 - java selenium cucumber
- node.js - 使用 React、Express 获取 404 onSubmit