首页 > 解决方案 > 使用 AWS EMR 调用 MongoDB Atlas 时出现连接超时异常

问题描述

我是使用 AWS EMR 的新手,我正在尝试使用 Mongo Spark 连接器将我的 AWS EMR 集群连接到我的 MongoDB Atlas 数据库。尝试连接时,我不断收到超时错误 - 以下是我在下面遵循的一些步骤:

我使用了 MongoDB 仪表板中给出的连接字符串 (mongodb+srv://:@news.qd3xn.mongodb.net/)。但是,当我尝试使用我的 mongodb 输入创建 Spark 数据帧时,我不断收到超时错误。有几个相关线程指向 Spark 和 MongoDB 版本兼容性问题。我目前正在使用 Python 3.7.6、Spark 2.4.5、MongoDB Spark 连接器 2.4.2 和 MongoDB Atlas 4.2.8。

我尝试使用我能找到的 Mongo Java 驱动程序代码的每个排列来更改连接字符串。我可以使用 Mongo Compass 和我的本地计算机连接到 MongoDB 节点,只是不能使用我的 EMR 主节点。我能够使用相同的代码在本地连接,但是当我尝试从 EMR 与 pymongo 进行简单连接时,它不起作用。我的本地机器和 EMR 都能够 ping mongodb 节点,但 EMR 没有连接。我还将我的计算机 IP 地址和 MongoDB 内主节点的 IP 列入白名单。

我已经使用以下代码在 EMR shell 中启动了我的 pyspark 会话:

pyspark \
--conf "spark.mongodb.input.uri=mongodb+srv://<clustername>:<password>@news.qd3xn.mongodb.net/sample_airbnb?readPreference=primaryPreferred" \
--conf "spark.mongodb.input.database=sample_airbnb" \
--conf "spark.mongodb.input.collection=listingsAndReviews" \
--conf "spark.mongodb.output.uri=mongodb+srv://<clustername>:<password>@news.qd3xn.mongodb.net/sample_airbnb" \
--conf "mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat" \
--packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.2"

之后,我尝试使用以下命令创建 Spark 数据框:

df1 = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
.option("uri","mongodb+srv://<clustername>:<password>@news.qd3xn.mongodb.net/sample_airbnb.listingsAndReviews").load()   

我得到的错误信息是:

 >>> df1 = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri","mongodb+srv://<clustername>:<password>@news.qd3xn.mongodb.net/sample_airbnb.listingsAndReviews").load()
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o109.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@1164cf33. Client view of cluster state is {type=REPLICA_SET, servers=[{address=news-shard-00-00.qd3xn.mongodb.net:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=news-shard-00-01.qd3xn.mongodb.net:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=news-shard-00-02.qd3xn.mongodb.net:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
        at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:408)
        at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:123)
        at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:54)
        at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:154)
        at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:103)
        at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:284)
        at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:188)
        at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:194)
        at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:163)
        at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:158)
        at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
        at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
        at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
        at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
        at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:157)
        at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:174)
        at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:237)
        at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:221)
        at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:221)
        at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
        at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97)
        at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

我不完全明白错误在说什么。我的集群名称是“news”,看起来 EMR 正在调整名称以访问 news-shard-00-00 发生了一些转变。我该如何解决这个问题?

标签: python-3.xmongodbpysparkamazon-emrconnector

解决方案


推荐阅读