首页 > 解决方案 > 在 spark return py4j.protocol.Py4JJavaError 中从 PostgreSQL 读取数据

问题描述

我正在尝试使用 python 从 apache spark 中的 postgres 读取数据,但返回 py4j.protocol.Py4JJavaError

这是我的代码:

import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark Read") \
    .config("spark.jars", "C:\spark\spark-2.4.5-bin-hadoop2.7\jars\postgresql-42.2.12.jar") \
    .getOrCreate()

df = spark.read \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://127.0.0.1:5432/db_test") \
    .option("dbtable", "test") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .load()

df.printSchema()

这是错误日志:

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/C:/spark/spark-2.4.5-bin-hadoop2.7/jars/spark-unsafe_2.11-2.4.5.jar) to method java.nio.Bits.unaligned()
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Traceback (most recent call last):
  File "C:/spark/spark-2.4.5-bin-hadoop2.7/read-rdbms.py", line 17, in <module>
    .option("password", "postgres") \
  File "C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\readwriter.py", line 172, in load
  File "C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
  File "C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco
  File "C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.load.
: org.postgresql.util.PSQLException: FATAL: password authentication failed for user "postgres"
        at org.postgresql.core.v3.ConnectionFactoryImpl.doAuthentication(ConnectionFactoryImpl.java:525)
        at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:146)
        at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:197)
        at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
        at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:211)
        at org.postgresql.Driver.makeConnection(Driver.java:459)
        at org.postgresql.Driver.connect(Driver.java:261)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:210)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:564)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:832)

顺便说一句,我在我的机器上本地运行我的 pyspark 脚本,我意识到有

致命:用户“postgres”的密码验证失败

但是当我使用相同的凭据测试使用 Navicat 和 Dbeaver 进行连接时,它成功连接没有问题吗?有什么我想念的吗?

标签: apache-sparkpyspark

解决方案


基于@mazaneicha 的回答,它在将我的 JDK 降级到 8u251(我以前的 JDK 是 14 或最新版本)后工作。


推荐阅读