首页 > 解决方案 > 如何在不使用 HDP 3.1 中的仓库连接器的情况下将表从 spark 中写入 hive

问题描述

当尝试在 HDP 3.1 上使用 spark 2.3 写入 Hive 表时,无需使用仓库连接器直接进入 hives 模式,使用:

spark-shell --driver-memory 16g --master local[3] --conf spark.hadoop.metastore.catalog.default=hive
val df = Seq(1,2,3,4).toDF
spark.sql("create database foo")
df.write.saveAsTable("foo.my_table_01")

失败:

Table foo.my_table_01 failed strict managed table checks due to the following reason: Table is marked as a managed table but is not transactional

但是一个:

val df = Seq(1,2,3,4).toDF.withColumn("part", col("value"))
df.write.partitionBy("part").option("compression", "zlib").mode(SaveMode.Overwrite).format("orc").saveAsTable("foo.my_table_02")

火花与spark.sql("select * from foo.my_table_02").show作品就好了。现在去蜂巢/直线:

0: jdbc:hive2://hostname:2181/> select * from my_table_02;
Error: java.io.IOException: java.lang.IllegalArgumentException: bucketId out of range: -1 (state=,code=0)

一个

 describe extended my_table_02;

返回

 +-----------------------------+----------------------------------------------------+----------+
|          col_name           |                     data_type                      | comment  |
+-----------------------------+----------------------------------------------------+----------+
| value                       | int                                                |          |
| part                        | int                                                |          |
|                             | NULL                                               | NULL     |
| # Partition Information     | NULL                                               | NULL     |
| # col_name                  | data_type                                          | comment  |
| part                        | int                                                |          |
|                             | NULL                                               | NULL     |
| Detailed Table Information  | Table(tableName:my_table_02, dbName:foo, owner:hive/bd-sandbox.t-mobile.at@SANDBOX.MAGENTA.COM, createTime:1571201905, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:value, type:int, comment:null), FieldSchema(name:part, type:int, comment:null)], location:hdfs://bd-sandbox.t-mobile.at:8020/warehouse/tablespace/external/hive/foo.db/my_table_02, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{path=hdfs://bd-sandbox.t-mobile.at:8020/warehouse/tablespace/external/hive/foo.db/my_table_02, compression=zlib, serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[FieldSchema(name:part, type:int, comment:null)], parameters:{numRows=0, rawDataSize=0, spark.sql.sources.schema.partCol.0=part, transient_lastDdlTime=1571201906, bucketing_version=2, spark.sql.create.version=2.3.2.3.1.0.0-78, totalSize=740, spark.sql.sources.schema.numPartCols=1, spark.sql.sources.schema.part.0={\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}, numFiles=4, numPartitions=4, spark.sql.partitionProvider=catalog, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=orc, transactional=true}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, rewriteEnabled:false, catName:hive, ownerType:USER, writeId:-1) |

如何在不使用仓库连接器的情况下使用 spark 写入 hive,但仍然写入稍后可以被 hive 读取的同一个元存储?据我所知,外部表应该是可能的(你不是管理的,不是 ACID 不是事务的),但我不知道如何告诉saveAsTable如何处理这些。

编辑

相关问题:

可能是像https://github.com/qubole/spark-acid这样的解决方法,比如https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html但是我不喜欢在我还没有看到任何大规模性能测试的情况下使用更多胶带的想法。此外,这意味着更改所有现有的 spark 作业。

事实上,无法将表保存到配置单元元存储,HDP 3.0报告了大型数据帧和仓库连接器的问题。

编辑

我刚刚找到https://community.cloudera.com/t5/Support-Questions/Spark-hive-warehouse-connector-not-loading-data-when-using/td-p/243613

和:

执行()与执行查询()

ExecuteQuery() 将始终使用 Hiveserver2-interactive/LLAP,因为它使用快速 ARROW 协议。当 jdbc URL 指向非 LLAP Hiveserver2 时使用它会产生错误。

Execute() 使用 JDBC 并且对 LLAP 没有这种依赖关系,但有一个内置限制,最多只能返回 1.000 条记录。但对于大多数查询(INSERT INTO ... SELECT、count、sum、average)来说,这不是问题。

但这不会扼杀 hive 和 spark 之间的任何高性能互操作性吗?特别是如果没有足够的 LLAP 节点可用于大规模 ETL。

事实上,这是真的。可以在https://github.com/hortonworks-spark/spark-llap/blob/26d164e62b45cfa1420d5d43cdef13d1d29bb877/src/main/java/com/hortonworks/spark/sql/hive/llap/HWConf.java#L39配置此设置,虽然我不确定增加这个值对性能的影响

标签: apache-sparkhiveapache-spark-sqlhdphadoop3

解决方案


“如何在不使用仓库连接器的情况下使用 spark 写入 hive,但仍然写入稍后可以被 hive 读取的同一个元存储?”

我们正在使用相同的设置(HDP 3.1 和 Spark 2.3)。使用下面的代码,我们收到与“bucketId out of range: -1”相同的错误消息。解决方案是set hive.fetch.task.conversion=none;在尝试查询表之前在 Hive shell 中运行。

在没有 HWC 的情况下将数据写入 Hive 的代码:

  val warehouseLocation = new File("spark-warehouse").getAbsolutePath

  case class Record(key: Int, value: String)

  val spark = SparkSession.builder()
    .master("yarn")
    .appName("SparkHiveExample")
    .config("spark.sql.warehouse.dir", warehouseLocation)
    .enableHiveSupport()
    .getOrCreate()

  spark.sql("USE databaseName")
  val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
  recordsDF.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("sparkhive_records")

[示例来自 https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html]


推荐阅读