首页 > 解决方案 > 无法使用托管在 kubernetes 中的 Pyspark 从 PSQL 读取

问题描述

我已经在 Kubernetes 中部署了 pyspark 3.0.1。

我在 jupyter notebook 中使用考拉来执行一些转换,我需要在 Azure Database for PostgreSQL 中进行写入和读取。

我可以使用以下代码从 pandas 中读取它:

from sqlalchemy import create_engine
import psycopg2
import pandas
uri = 'postgres+psycopg2://<postgreuser>:<postgrepassword>@<server>:5432/<database>'
engine_azure = create_engine(uri, echo=False)

df = pdf.read_sql_query(f"select * from public.<table>", con=engine_azure)

我想使用以下代码从 Pyspark 读取此表:

import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import databricks.koalas as ks
from s3fs import S3FileSystem
import datetime

os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3,org.postgresql:postgresql:42.1.1  pyspark-shell  pyspark-shell"
os.environ['PYSPARK_SUBMIT_ARGS2'] = "--packages org.postgresql:postgresql:42.1.1  pyspark-shell"

sparkClassPath = os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'

# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "<image>")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "2")
sparkConf.set("spark.driver.memory", "2000m")
sparkConf.set("spark.executor.memory", "2000m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "29414")
sparkConf.set("spark.driver.host", "<deployment>.svc.cluster.local")
sparkConf.set("spark.driver.extraClassPath", sparkClassPath)
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

df3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://<host>:5432/<database>") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "select * from public.<table>") \
    .option("user", "<user>") \
    .option("password", "<password>") \
    .load()

但我收到此错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-5-a529178ed9a0> in <module>
      1 url = 'jdbc:postgresql://psql-mcf-prod1.postgres.database.azure.com:5342/cpke-prod'
      2 properties = {'user': 'adminmcfpsql@psql-mcf-prod1.postgres.database.azure.com', 'password': '4vb44B^V8w2D*q!eQZgl',"driver": "org.postgresql.Driver"}
----> 3 df3 = spark.read.jdbc(url=url, table='select * from public.userinput_write_offs where reversed_date is NULL', properties=properties)

/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
    629             jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
    630             return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
--> 631         return self._df(self._jreader.jdbc(url, table, jprop))
    632 
    633 

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o89.jdbc.
: org.postgresql.util.PSQLException: The connection attempt failed.
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
    at org.postgresql.Driver.makeConnection(Driver.java:450)
    at org.postgresql.Driver.connect(Driver.java:252)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:312)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:607)
    at org.postgresql.core.PGStream.<init>(PGStream.java:68)
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
    ... 27 more

标签: postgresqlapache-sparkkubernetesjdbcpyspark

解决方案


您的端口号不正确 - 它应该是 5432,而不是 5342。因此您的连接超时。如果换行

.option("url", "jdbc:postgresql://<host>:5342/<database>")

.option("url", "jdbc:postgresql://<host>:5432/<database>")

也许它会解决你的问题。


推荐阅读