hive - 在 HDP 3.1 集群上执行 flink 1.10 以访问 hive 表
问题描述
我想在安全的 kerberized HDP 3.1 集群上使用 apache flink,但我仍然坚持第一步。
最新版本已下载并解压缩(https://flink.apache.org/downloads.html#apache-flink-1101)
现在,我尝试关注 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/hive/
要与 Hive 集成,您需要在 Flink 发行版中的 /lib/ 目录中添加一些额外的依赖项,以便在 Table API 程序或 SQL Client 中的 SQL 中进行集成。或者,您可以将这些依赖项放在专用文件夹中,并使用 -C 或 -l 选项将它们添加到类路径中
由于 HDP 环境:
- 现有配置位于:
/usr/hdp/current/hive-server2/conf/hive-site.xml
- JAR 位于
/usr/hdp/current/hive-server2/lib
. 可能会使用 flink 提供的 jars。但我更喜欢直接从 HDP 发行版中使用 Hive 罐子https://github.com/apache/flink/pull/11328/files/b4a76d76d2c1e9722befabc03b2191d053c70fa8#diff-ecb34d0bf175b780ec6ca71da8ec23beR111
我怎样才能:
- 告诉 flink 使用/加载这些到类路径
- 是否有可能省略第一个配置步骤(nae、defaultdb、confdir、版本)并以某种方式从 hive-site.xml 中自动推断?
- 启动一个交互式 shell(类似于 spark-shell,即类似于 flinks 交互式 sql shell,但基于 scala),以便按照链接中概述的以下步骤进行操作。
编码
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv = TableEnvironment.create(settings)
val name = "myhive"
val defaultDatabase = "mydatabase"
val hiveConfDir = "/opt/hive-conf" // a local path
val version = "2.3.4"
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")
编辑
想必需要以下内容才能找到hadoop配置:
export HADOOP_CONF_DIR=/usr/hdp/current/spark2-client/conf
以及: https ://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html 即:
export HADOOP_CLASSPATH=$(hadoop classpath)
现在,即使没有任何 hive 支持,我仍然无法启动 flink shell:
cd /path/to/flink-1.10.1/bin
./start-scala-shell.sh
Error: Could not find or load main class org.apache.flink.api.scala.FlinkShell
这个初步问题似乎可以通过切换到较旧的 2.11 版本来解决Flink 1.7.2 start-scala-shell.sh cannot find or load main class org.apache.flink.api.scala.FlinkShell
./start-scala-shell.sh local
已经可以让我启动本地外壳了。
./start-scala-shell.sh yarn
启动一些东西(本地),但没有启动纱线容器。
同时我已经设置:
catalogs:
- name: myhive
type: hive
hive-conf-dir: /usr/hdp/current/hive-server2/conf
hive-version: 3.1.2
在本地 flink 配置中。我仍然不清楚是否只需指定上述环境变量就可以使其自动工作。
但是,对我而言,由于未加载 env,因此代码无法编译:
scala> env
<console>:68: error: not found: value env
但试图手动指定
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
// environment configuration
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
也失败了
.UnsupportedOperationException: Execution Environment is already defined for this shell.
编辑 2
和:
cd /path/to/flink-1.10.1
export HADOOP_CONF_DIR=/usr/hdp/current/spark2-client/conf
export HADOOP_CLASSPATH=$(hadoop classpath)
./bin/yarn-session.sh --queue my_queue -n 1 -jm 768 -tm 1024
我可以在 YARN 上成功启动一个简约的 flink 集群(没有 ambari 服务)。尽管安装 ambari 集成是有意义的。
目前,我还无法测试与 kerberized Hive 和 HDFS 的交互是否/如何工作。此外,目前,我无法启动交互式外壳 - 如下所述。
事实上,即使在游乐场非 kerberized 环境中,我也观察到 flinks 交互式 shell flink start scala shell - numberformat exepction 的问题
编辑 3
我不知道发生了什么变化,但有:
cd /home/at/heilerg/development/software/flink-1.10.1
export HADOOP_CONF_DIR=/usr/hdp/current/spark2-client/conf
export HADOOP_CLASSPATH=$(hadoop classpath)
./bin/start-scala-shell.sh local
btenv.listDatabases
//res12: Array[String] = Array(default_database)
btenv.listTables
//res9: Array[String] = Array()
我可以在本地模式下获取批处理表环境。目前,虽然没有来自 hive 的表或数据库。
注意:配置设置如下:
catalogs: - name: hdphive type: hive hive-conf-dir: /usr/hdp/current/hive-server2/conf hive-version: 3.1.2
当尝试使用代码而不是配置时,我无法导入HiveCatalog
:
val name = "hdphive"
val defaultDatabase = "default"
val hiveConfDir = "/usr/hdp/current/hive-server2/conf" // a local path
val version = "3.1.2" //"2.3.4"
import org.apache.flink.table.catalog.hive.HiveCatalog
// for now, I am failing here
// <console>:67: error: object hive is not a member of package org.apache.flink.table.catalog
// import org.apache.flink.table.catalog.hive.HiveCatalog
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog(name, hive)
tableEnv.useCatalog(name)
btenv.listDatabases
这些罐子被手动放入lib
目录中:
- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber/3.1.1.7.0.3.0-79-7.0
- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive_2.11/1.10.1
无论 hive 罐子的版本如何,我都面临缺少 Hive 类:
val version = "3.1.2" // or "3.1.0" // has the same problem
import org.apache.flink.table.catalog.hive.HiveCatalog
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
java.lang.NoClassDefFoundError: org/apache/hadoop/hive/metastore/api/NoSuchObjectException
... 30 elided
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.metastore.api.NoSuchObjectException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 30 more
但不是:export HADOOP_CLASSPATH=$(hadoop classpath)
用于加载 HDP 类?
无论如何:
cp /usr/hdp/current/hive-client/lib/hive-exec-3.1.0.<<<version>>>.jar /path/to/flink-1.10.1/lib
意味着我更进一步:
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
btenv.registerCatalog(name, hive)
Caused by: java.lang.ClassNotFoundException: com.facebook.fb303.FacebookService$Iface
将https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/添加到lib
目录后
btenv.registerCatalog(name, hive)
不会抱怨未找到类异常,但执行似乎在此步骤停留了几分钟。然后,它因 kerberos 异常而失败:
Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: GSS initiate failed
我才意识到:
<property>
<name>hive.metastore.kerberos.principal</name>
<value>hive/_HOST@xxxx</value>
</property>
和
klist
Default principal: user@xxxx
这里的主体与 hive-site.xml 中的主体不匹配。但是,spark 可以使用此处概述的相同配置和主体不匹配很好地读取元存储。
解决方案
推荐阅读
- ios - 了解委托将如何在代码中触发
- curl - 从大量文件中获取所有以 http/https 开头并以 exe、msi 等结尾的 url,并测试 http 响应代码并将输出写入文件
- javascript - 目前使用 iframe 作为美化脚本标签(沙箱我的 JavaScript 代码)。我可以使用工作人员、共享工作人员或类似人员吗?
- javascript - Google 令牌已过期或被撤销
- java - java.io.FileNotFoundException:类路径资源 [config/foo.yml] 无法打开,因为它不存在
- go - 多个 goroutine 在一个通道上选择性地监听
- html - 如何在 html 条中对齐组件
- amazon-s3 - Parquet 文件上的 includeOpForFullLoad 属性 - AWS DMS
- python - 获取用户输入的句子/字符串并比较列表中的项目,如果句子中的关键字与列表项匹配,则返回列表条目
- hadoop - hadoop中多个Namenodes的优缺点