首页 > 解决方案 > Pyspark 尝试写入 DB2 表 - 截断覆盖

问题描述

我正在尝试使用 Pyspark (2.4) 将数据写入 IBM DB2 (10.5 fix pack 11)。当我尝试执行下面的代码时

df.write.format("jdbc")
.mode('overwrite').option("url",'jdbc:db2://<host>:<port>/<DB>').
option("driver", 'com.ibm.db2.jcc.DB2Driver').
option('sslConnection', 'true')
.option('sslCertLocation','</location/***_ssl.crt?').
option("numPartitions", 1).
option("batchsize", 1000)
.option('truncate','true').
option("dbtable", '<TABLE>').
option("user",'<user>').
option("password", '<PW>')
.save()

作业抛出以下异常:

文件“/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py”,第 326 行,在 get_return_value py4j.protocol.Py4JJavaError : 调用 o97.save 时出错。: com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL 错误: SQLCODE=-104, SQLSTATE=42601, SQLERRMC=END-OF-STATEMENT;ABLE<SEHEMA.TABLE>;IMMEDIATE, DRIVER=4.19.80 at com .ibm.db2.jcc.am.b5.a(b5.java:747)

作业正在尝试执行截断,但似乎 DB2 期待 ** IMMEDIATE** 关键字

在我上面的代码中,我传递的只是 dbtable 的名称,有没有办法传递 IMMEDIATE 关键字?

而且从 DB2 方面来说,有没有办法在打开会话时设置它?

仅供参考,我的代码没有截断工作,但是删除表并重新创建和加载,我不想在 prod 环境中这样做。

非常感谢有关如何解决此问题的任何想法。

标签: apache-sparkjdbcpysparkdb2

解决方案


DB2Dialect in Spark 2.4 doesn't override the default JDBCDialect's implementation of a TRUNCATE TABLE. Comments in the code suggest to override this method to return a statement that suits your database engine.

  /**
   * The SQL query that should be used to truncate a table. Dialects can override this method to
   * return a query that is suitable for a particular database. For PostgreSQL, for instance,
   * a different query is used to prevent "TRUNCATE" affecting other tables.
   * @param table The table to truncate
   * @param cascade Whether or not to cascade the truncation
   * @return The SQL query to use for truncating a table
   */
  @Since("2.4.0")
  def getTruncateQuery(
    table: String,
    cascade: Option[Boolean] = isCascadingTruncateTable): String = {
      s"TRUNCATE TABLE $table"
  } 

Perhaps in DB2 case you can actually extend DB2Dialect itself, add your getTruncateQuery() implementation and define your "custom" JDBC protocol, "jdbc:mydb2" for example. You can then use this protocol in JDBC connection URL, .option("url",'jdbc:mydb2://<host>:<port>/<DB>').


推荐阅读