apache-spark - 如何在 spark-jdbc 应用程序中给出表名以读取 RDBMS 数据库上的数据?
问题描述
我正在尝试使用 spark 读取 greenplum 数据库上的表,如下所示:
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.table where period_year=2017 and period_num=12"
val yearDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016")
.option("user", devUserName)
.option("password", devPassword)
.option("partitionColumn","header_id")
.option("lowerBound", 16550)
.option("upperBound", 1152921481695656862L)
.option("numPartitions",450).load()
当我使用 spark-submit 运行代码时,出现异常:
Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2310)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2023)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:217)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.YearPartition$.prepareFinalDF$1(YearPartition.scala:141)
at com.partition.source.YearPartition$.main(YearPartition.scala:164)
at com.partition.source.YearPartition.main(YearPartition.scala)
在execQuery
我可以看到模式名称和表名正确形成。当我提交代码时,它说public.(select je_header_id,source_system_name,) relation not found
. 我不明白为什么将public
模式名称和查询(select je_header_id,source_system_name,je_line_num,last_update"
作为表名。
谁能让我知道我在这里犯了什么错误以及如何解决它?
解决方案
如果您使用的是 spark jdbc ,则可以包装查询并将其传递给 dbtable 参数。如果关键就像任何 jdbc 一样工作,这应该可以工作。
val query = """
(select a.id,b,id,a.name from a left outer join b on a.id=b.id
limit 100) foo
"""
val df = sqlContext.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/local_content").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", "root").
option("password", "").
option("dbtable",query).
load()
推荐阅读
- excel - 调用时显示单独定位的用户窗体跳转
- meteor - 模板助手中的异常:TypeError:无法读取未定义的属性“mergedSchema”
- docker - Docker:安装 apt-utils 时遇到问题
- android - 如何在 React Native for Android 中为文本输入定义插入符号颜色?
- wildfly-swarm - Wildfly Swarm 2018.5.0 不启动 HTTPS 监听
- python - 将参数传递给python中的装饰器
- bash - 如何在 curl 请求中扩展文件名
- vba - 如果语句在 VBA 中没有返回正确的值
- ios - 在 Swift 中同时 Deinit 两个 ViewController
- python-3.x - 无法单击使用 selenium 和 python 进行抓取的按钮