json - 使用 sqlcontext.sql(...) 动态创建 Hive 外部表
问题描述
我在 Zeppelin 笔记本中有一个 pyspark 脚本,我将它指向位于 BLOB 存储中的 JSON 文件,以便推断 JSON 模式并在 Hive 中创建一个外部表。
我可以获取从脚本打印的 SQL 命令,并在单独的段落中执行它,并且表创建得很好,但是当我尝试通过 sqlcontext.sql() 方法创建表时,出现以下错误;
AnalysisException: u'org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: MetaException(message:java.lang.ClassNotFoundException Class org.openx.data.jsonserde.JsonSerDe not found);'
谷歌搜索这个错误只会显示确保 SerDe 的 JAR 文件在服务器上的页面,很明显,因为我可以手动创建这个表。下面是我的脚本;
%spark2.pyspark
import os
import datetime as dt
import time
from datetime import date
from pyspark.sql.functions import monotonically_increasing_id, lit
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import split, lower, unix_timestamp, from_unixtime
hiveDbName = 'dev_phoenix'
hiveTableName = 'et_engagement_cac'
serdeName = 'org.openx.data.jsonserde.JsonSerDe'
jsonFileLocation = 'wasbs://blah-blah-blah@meh-meh-meh.blob.core.windows.net/dev/data/Engagement'
jsonDf = sqlContext.read.json("wasbs://blah-blah-blah@meh-meh-meh.blob.core.windows.net/dev/data/Engagement/Engagement.json")
# jsonDf.printSchema()
extTableDDL = "create external table " + hiveDbName + "." + hiveTableName + "(\n"
for col in jsonDf.dtypes:
extTableDDL += '`' + col[0] + '` ' + col[1].replace('_id','`_id`') + ',\n'
extTableDDL = extTableDDL[:-2]
extTableDDL += ')\nrow format serde \'' + serdeName + '\'\n'
extTableDDL += 'location \'' + jsonFileLocation + '\'\n'
extTableDDL += 'tblproperties (\'serialization.null.format\'=\'\')'
print extTableDDL
sqlContext.sql(extTableDDL)
我故意混淆了我们的 WASB 容器名称,因此使用了 blah/meh。
我发现一些帖子让我开始认为可以使用 sqlcontext.sql 创建的表类型存在限制,也许我想做的事情是不可能的?
当我取出 SerDe 声明时,我能够成功创建表,但是 Hive 使用了默认的 SerDe,它不适用于我在基础文件中的数据。
解决方案
好的,所以我想我知道发生了什么,以及如何解决它。我怀疑我正在尝试使用的 SerDe 的 JAR 文件位于服务器上的一个目录中,该目录不在类路径变量中。
所以,我第一次调用 spark.sql(...) 来添加 JAR,现在它正在工作。请参阅下面的更新脚本;
%spark2.pyspark
import os
import datetime as dt
import time
from datetime import date
from pyspark.sql.functions import monotonically_increasing_id, lit
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import split, lower, unix_timestamp, from_unixtime
hiveDbName = 'dev_phoenix'
hiveTableName = 'et_engagement_cac'
serdeName = 'org.openx.data.jsonserde.JsonSerDe'
jsonFileLocation = 'wasbs://blah-blah-blah@meh-meh-meh.blob.core.windows.net/dev/data/Engagement'
jsonDf = spark.read.json("wasbs://blah-blah-blah@meh-meh-meh.blob.core.windows.net/dev/data/Engagement/Engagement.json")
# jsonDf.printSchema()
spark.sql('add jar /usr/hdp/current/hive-client/lib/json-serde-1.3.8-jar-with-dependencies.jar')
extTableDDL = "create external table " + hiveDbName + "." + hiveTableName + "(\n"
for col in jsonDf.dtypes:
extTableDDL += '`' + col[0] + '` ' + col[1].replace('_id','`_id`').replace('_class','`_class`') + ',\n'
extTableDDL = extTableDDL[:-2]
extTableDDL += ')\nROW FORMAT SERDE\n'
extTableDDL += ' \'' + serdeName + '\'\n'
extTableDDL += 'STORED AS INPUTFORMAT\n'
extTableDDL += ' \'org.apache.hadoop.mapred.TextInputFormat\'\n'
extTableDDL += 'OUTPUTFORMAT\n'
extTableDDL += ' \'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\'\n'
extTableDDL += 'location \'' + jsonFileLocation + '\'\n'
extTableDDL += 'tblproperties (\'serialization.null.format\'=\'\')'
print extTableDDL
spark.sql(extTableDDL)
推荐阅读
- django - 如何将 view.py 中计算的输出与 form.py 中的其余数据一起保存
- javascript - Chrome 没有显示新更新的 html、javascript?或许可以俯瞰?
- selenium - Selenium webDriver,操作是否可能出现乱序?
- python - User ForiegnKey 现在在 django rest 框架视图集中显示为一个字段
- angular - 使用资产目录中的 SVG 并放置在 xlink:href
- c# - 子 ViewModel 如何提示父 ViewModel 在 Caliburn.Micro 中导航离开?
- python - 如何在snakemake中修复这个“IndexError:list index out of range”
- c++ - 二进制表达式的无效操作数(字符串到字符串(又名基本字符串))
- node.js - ldap authentication unauthorized
- r - 为 R 中的所有观测值生成多个分类变量水平的频率表