postgresql - Spark sql查询卡住
问题描述
我们运行一个 Spark 流应用程序(Spark 版本 - 2.4.3),并使用预定线程从 postgres DB 中检索配置。我们使用火花做到这一点
spark.read().format("jdbc").options(options).load()
问题是我们注意到在几次迭代期间,查询只是卡住了。没有任何事情发生。在此之后的调用仍然有效,并且流操作继续。我提取了线程转储,这就是我得到的。
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:171)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.read(InputRecord.java:503)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975) => holding Monitor(java.lang.Object@1651260651})
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => holding Monitor(sun.security.ssl.AppInputStream@1497069430})
org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:143)
org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:112)
org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:71)
org.postgresql.core.PGStream.ReceiveChar(PGStream.java:282)
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1803)
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:255) => holding Monitor(org.postgresql.core.v3.QueryExecutorImpl@2049145458})
org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:570)
org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:420)
org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:305)
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.run(Task.scala:121)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
知道为什么会这样吗?
解决方案
推荐阅读
- javascript - 如何在javascript中分割文本
- unity3d - 需要帮助实施 Perforce depot 或 GitHub repot,允许异地云托管与本地计算机之间的现场主服务器同步
- pandas - 在 pandas 的 groupby 期间根据列是否包含特定字符串来创建变量
- r - 如何从R中的数据框中删除重复项
- android - 有两个intent filter的activity重启了,但是重启的时候主intent-filter没有被调用,其他intentet filter被调用
- salesforce - 无法连接到 Salesforce api
- django - 移动设备上的 Django 实时访问
- java - 我在哪里可以下载用于 Java 的 rt.jar?
- python - Python pandas 替代 'map' 用于 2 个变量的函数
- python - Pandas Groupby 并使用自定义值创建新列