首页 > 解决方案 > 无法使用 pyspark 通过 ssl 连接到 mongodb

问题描述

pyspark 版本:2.3.4 mongodb:4.2

我已经为我的 mongodb 设置了 ssl,现在我正在尝试使用 pyspark 将 mongodb 与 SSL 连接

我的示例代码:

from pyspark.sql import SparkSession

my_spark = SparkSession \
    .builder \
    .appName("mySparkMongoJob") \
    .config("spark.mongodb.input.uri", "mongodb://admin:password@www.mongod.com:27017/db_name.collection?authSource=admin&ssl=true") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

df.show()

然后我完成了

keytool -import -file /opt/certs/mdb.crt -alias mongodb -keystore /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/security/cacerts

然后我在此链接中按照以下步骤操作:如何生成密钥库和信任库以生成密钥库和信任库。

当我使用运行脚本时

spark-submit \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.2 \
--conf spark.executor.extraJavaOptions="Djavax.net.ssl.trustStore=/home/svr_data_analytic/trustStore.jks -Djavax.net.ssl.trustStorePassword=changeit" \
--conf spark.driver.extraJavaOptions="-Djavax.net.ssl.trustStore=/home/svr_data_analytic//trustStore.jks -Djavax.net.ssl.trustStorePassword=changeit" \
--conf spark.executor.extraJavaOptions="-Djavax.net.ssl.keyStore=/home/svr_data_analytic/KeyStore.jks -Djavax.net.ssl.keyStorePassword=changeit"\
test.py

我收到此错误

2021-01-29 14:06:50 INFO  SparkContext:54 - Created broadcast 0 from broadcast at MongoSpark.scala:542
2021-01-29 14:06:50 INFO  cluster:71 - Cluster created with settings {hosts=[www.mongod.com:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
2021-01-29 14:06:50 INFO  cluster:71 - Cluster description not yet available. Waiting for 30000 ms before timing out
2021-01-29 14:06:50 INFO  cluster:76 - Exception in monitor thread while connecting to server www.mongod.com:27017
com.mongodb.MongoSocketReadException: Prematurely reached end of stream
    at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:92)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:554)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:425)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:289)
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255)
    at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83)
    at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initializeConnectionDescription(InternalStreamConnectionInitializer.java:106)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:63)
    at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:127)
    at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:117)
    at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "/home/svr_data_analytic/hmis-analytics-data-processing/src/main/python/scripts/test.py", line 62, in <module>
    main()
  File "/home/svr_data_analytic/hmis-analytics-data-processing/src/main/python/scripts/test.py", line 50, in main
    df = processing(spark)
  File "/home/svr_data_analytic/hmis-analytics-data-processing/src/main/python/scripts/test.py", line 12, in processing
    .option('uri', '{}/{}.{}?authSource={}&ssl=true'.format('mongodb://admin:password@www.mongod.com:27017', 'smarthis_prod', 'IpAppointment', 'admin')).load()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=www.mongod.com:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
    at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:179)
    at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
    at com.mongodb.client.internal.MongoClientDelegate.getServerAddressList(MongoClientDelegate.java:116)
    at com.mongodb.Mongo.getServerAddressList(Mongo.java:401)
    at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
    at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
    at com.mongodb.spark.LoggingTrait$class.logInfo(LoggingTrait.scala:48)
    at com.mongodb.spark.Logging.logInfo(Logging.scala:24)
    at com.mongodb.spark.connection.MongoClientCache.logClient(MongoClientCache.scala:161)
    at com.mongodb.spark.connection.MongoClientCache.acquire(MongoClientCache.scala:56)
    at com.mongodb.spark.MongoConnector.acquireClient(MongoConnector.scala:239)
    at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:152)
    at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
    at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:234)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:217)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:217)
    at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
    at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97)
    at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

2021-01-29 14:07:20 INFO  SparkContext:54 - Invoking stop() from shutdown hook

谁能告诉我哪里出错了?实际上,我是这个概念的新手,我在谷歌做了一些研究后完成了这一切。提前致谢。

标签: pythonmongodbapache-sparksslpyspark

解决方案


查看服务器日志以了解连接关闭的原因。

最可能的原因:

  • 您尚未提供客户端证书,并且服务器已配置为验证客户端证书
  • 您提供了证书,但由于没有正确的 CA 证书,服务器无法对其进行验证
  • 你实际上并没有使用 TLS

推荐阅读