python - Spark 到 MongoDB 的连接在没有有意义的错误的情况下关闭
问题描述
我正在使用 MongoDB 版本 3.4.7、Spark 版本 1.6.3 和 MongoDB-Spark 连接器版本 1.1.0。
我有一个 pyspark 脚本,它从 MongoDB 集合中提取数据以创建数据框。我注意到连接关闭后我的 spark-submit 失败。(请参阅下面的注销)。
19/02/18 23:47:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, node1.dev.qwerty.asdf.io, partition 0,ASDF_LOCAL, 2476 bytes)
19/02/18 23:47:25 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on node1.dev.qwerty.asdf.io:45779 (size: 2.7 KB, free: 2.7 GB)
19/02/18 23:47:25 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on node1.dev.qwerty.asdf.io:45779 (size: 497.0 B, free: 2.7 GB)
19/02/18 23:47:29 INFO MongoClientCache: Closing MongoClient: [mongoconfig-001.zxcv.prod.rba.company.net:27017]
19/02/18 23:47:29 INFO connection: Closed connection [connectionId{localValue:2}] to mongoconfig-001.zxcv.prod.rba.company.net:27017 because the pool has been closed.
19/02/18 23:47:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, node1.dev.qwerty.asdf.io): com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongoconfig-001.zxcv.prod.rba.company.net:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
at com.mongodb.connection.BaseCluster.getDescription(BaseCluster.java:163)
at com.mongodb.Mongo.getClusterDescription(Mongo.java:411)
at com.mongodb.Mongo.getServerAddressList(Mongo.java:404)
at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
at com.mongodb.spark.LoggingTrait$class.logInfo(LoggingTrait.scala:48)
at com.mongodb.spark.Logging.logInfo(Logging.scala:24)
at com.mongodb.spark.connection.MongoClientCache.logClient(MongoClientCache.scala:161)
at com.mongodb.spark.connection.MongoClientCache.acquire(MongoClientCache.scala:56)
at com.mongodb.spark.MongoConnector.acquireClient(MongoConnector.scala:239)
at com.mongodb.spark.rdd.MongoRDD.compute(MongoRDD.scala:141)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我不确定这里发生了什么。有人可以帮我吗?
我目前正在使用下面的代码。
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://<USER>:<PASSWORD>@<HOST>:<PORT>/db.<COLLECTION>?ssl=true&authSource=<DATABASENAME>").load()
我正在使用下面的 spark-submit 调用上面的脚本
spark-submit --master yarn --verbose --jars mongo-java-driver-3.4.2.jar,mongo-spark-connector_2.10-1.1.0.jar --py-files pymongo_spark.py test.py
解决方案
推荐阅读
- java - 我应该如何使用 jdbc 从 plsql 将 varray 发送到 java?
- c - 如何从文本文件中将数据作为字符读取,然后将每个字符除以一个 int
- json - 终端进程以退出代码终止:1
- r - 如何计算r中具有相似列名的列的rowMeans?
- applescript-objc - 如何使用 Applescript 打开具有特定扩展名的文件
- asp.net-core - slugified url 上的模糊路线匹配
- c - OpenMP:While 循环内的 2 个嵌套 For 循环。如何修复多线程功能?(雅可比求解器)
- oracle - 想要将行转换为列
- python - 多个索引与python中的for循环匹配
- r - 数据框的总和列