首页 > 解决方案 > Spark 到 MongoDB 的连接在没有有意义的错误的情况下关闭

问题描述

我正在使用 MongoDB 版本 3.4.7、Spark 版本 1.6.3 和 MongoDB-Spark 连接器版本 1.1.0。

我有一个 pyspark 脚本,它从 MongoDB 集合中提取数据以创建数据框。我注意到连接关闭后我的 spark-submit 失败。(请参阅下面的注销)。

19/02/18 23:47:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, node1.dev.qwerty.asdf.io, partition 0,ASDF_LOCAL, 2476 bytes)
19/02/18 23:47:25 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on node1.dev.qwerty.asdf.io:45779 (size: 2.7 KB, free: 2.7 GB)
19/02/18 23:47:25 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on node1.dev.qwerty.asdf.io:45779 (size: 497.0 B, free: 2.7 GB)
19/02/18 23:47:29 INFO MongoClientCache: Closing MongoClient: [mongoconfig-001.zxcv.prod.rba.company.net:27017]
19/02/18 23:47:29 INFO connection: Closed connection [connectionId{localValue:2}] to mongoconfig-001.zxcv.prod.rba.company.net:27017 because the pool has been closed.
19/02/18 23:47:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, node1.dev.qwerty.asdf.io): com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongoconfig-001.zxcv.prod.rba.company.net:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
    at com.mongodb.connection.BaseCluster.getDescription(BaseCluster.java:163)
    at com.mongodb.Mongo.getClusterDescription(Mongo.java:411)
    at com.mongodb.Mongo.getServerAddressList(Mongo.java:404)
    at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
    at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
    at com.mongodb.spark.LoggingTrait$class.logInfo(LoggingTrait.scala:48)
    at com.mongodb.spark.Logging.logInfo(Logging.scala:24)
    at com.mongodb.spark.connection.MongoClientCache.logClient(MongoClientCache.scala:161)
    at com.mongodb.spark.connection.MongoClientCache.acquire(MongoClientCache.scala:56)
    at com.mongodb.spark.MongoConnector.acquireClient(MongoConnector.scala:239)
    at com.mongodb.spark.rdd.MongoRDD.compute(MongoRDD.scala:141)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我不确定这里发生了什么。有人可以帮我吗?

我目前正在使用下面的代码。

conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://<USER>:<PASSWORD>@<HOST>:<PORT>/db.<COLLECTION>?ssl=true&authSource=<DATABASENAME>").load()

我正在使用下面的 spark-submit 调用上面的脚本

spark-submit --master yarn --verbose --jars  mongo-java-driver-3.4.2.jar,mongo-spark-connector_2.10-1.1.0.jar  --py-files pymongo_spark.py test.py

标签: pythonmongodbapache-sparkpysparkpymongo

解决方案


推荐阅读