首页 > 解决方案 > 在 HDP 3.1 集群上执行 flink 1.10 以访问 hive 表

问题描述

我想在安全的 kerberized HDP 3.1 集群上使用 apache flink,但我仍然坚持第一步。

最新版本已下载并解压缩(https://flink.apache.org/downloads.html#apache-flink-1101

现在,我尝试关注 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/hive/

要与 Hive 集成,您需要在 Flink 发行版中的 /lib/ 目录中添加一些额外的依赖项,以便在 Table API 程序或 SQL Client 中的 SQL 中进行集成。或者,您可以将这些依赖项放在专用文件夹中,并使用 -C 或 -l 选项将它们添加到类路径中

由于 HDP 环境:

我怎样才能:

  1. 告诉 flink 使用/加载这些到类路径
  2. 是否有可能省略第一个配置步骤(nae、defaultdb、confdir、版本)并以某种方式从 hive-site.xml 中自动推断?
  3. 启动一个交互式 shell(类似于 spark-shell,即类似于 flinks 交互式 sql shell,但基于 scala),以便按照链接中概述的以下步骤进行操作。

编码

val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv = TableEnvironment.create(settings)

val name            = "myhive"
val defaultDatabase = "mydatabase"
val hiveConfDir     = "/opt/hive-conf" // a local path
val version         = "2.3.4"

val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")

编辑

想必需要以下内容才能找到hadoop配置:

export HADOOP_CONF_DIR=/usr/hdp/current/spark2-client/conf

以及: https ://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html 即:

export HADOOP_CLASSPATH=$(hadoop classpath)

现在,即使没有任何 hive 支持,我仍然无法启动 flink shell:

cd /path/to/flink-1.10.1/bin
./start-scala-shell.sh
Error: Could not find or load main class org.apache.flink.api.scala.FlinkShell

这个初步问题似乎可以通过切换到较旧的 2.11 版本来解决Flink 1.7.2 start-scala-shell.sh cannot find or load main class org.apache.flink.api.scala.FlinkShell

./start-scala-shell.sh local

已经可以让我启动本地外壳了。

./start-scala-shell.sh yarn

启动一些东西(本地),但没有启动纱线容器。

同时我已经设置:

catalogs:
   - name: myhive
     type: hive
     hive-conf-dir: /usr/hdp/current/hive-server2/conf
     hive-version: 3.1.2

在本地 flink 配置中。我仍然不清楚是否只需指定上述环境变量就可以使其自动工作。

但是,对我而言,由于未加载 env,因此代码无法编译:

scala> env
<console>:68: error: not found: value env

但试图手动指定

import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._

// environment configuration
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)

也失败了

.UnsupportedOperationException: Execution Environment is already defined for this shell.

编辑 2

和:

cd /path/to/flink-1.10.1
export HADOOP_CONF_DIR=/usr/hdp/current/spark2-client/conf
export HADOOP_CLASSPATH=$(hadoop classpath)

./bin/yarn-session.sh --queue my_queue -n 1 -jm 768 -tm 1024

我可以在 YARN 上成功启动一个简约的 flink 集群(没有 ambari 服务)。尽管安装 ambari 集成是有意义的。

目前,我还无法测试与 kerberized Hive 和 HDFS 的交互是否/如何工作。此外,目前,我无法启动交互式外壳 - 如下所述。

事实上,即使在游乐场非 kerberized 环境中,我也观察到 flinks 交互式 shell flink start scala shell - numberformat exepction 的问题

编辑 3

我不知道发生了什么变化,但有:

cd /home/at/heilerg/development/software/flink-1.10.1
export HADOOP_CONF_DIR=/usr/hdp/current/spark2-client/conf
export HADOOP_CLASSPATH=$(hadoop classpath)

./bin/start-scala-shell.sh local

btenv.listDatabases
//res12: Array[String] = Array(default_database)

btenv.listTables
//res9: Array[String] = Array()

我可以在本地模式下获取批处理表环境。目前,虽然没有来自 hive 的表或数据库。

注意:配置设置如下:

catalogs:
   - name: hdphive
     type: hive
     hive-conf-dir: /usr/hdp/current/hive-server2/conf
     hive-version: 3.1.2

当尝试使用代码而不是配置时,我无法导入HiveCatalog

val name            = "hdphive"
val defaultDatabase = "default"
val hiveConfDir     = "/usr/hdp/current/hive-server2/conf" // a local path
val version         = "3.1.2" //"2.3.4"


import org.apache.flink.table.catalog.hive.HiveCatalog
// for now, I am failing here
// <console>:67: error: object hive is not a member of package org.apache.flink.table.catalog
//       import org.apache.flink.table.catalog.hive.HiveCatalog

val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog(name, hive)
tableEnv.useCatalog(name)

btenv.listDatabases

这些罐子被手动放入lib目录中:

无论 hive 罐子的版本如何,我都面临缺少 Hive 类:

val version         = "3.1.2" // or "3.1.0" // has the same problem
import org.apache.flink.table.catalog.hive.HiveCatalog
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
java.lang.NoClassDefFoundError: org/apache/hadoop/hive/metastore/api/NoSuchObjectException
  ... 30 elided
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.metastore.api.NoSuchObjectException
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  ... 30 more

但不是:export HADOOP_CLASSPATH=$(hadoop classpath)用于加载 HDP 类?

无论如何:

cp /usr/hdp/current/hive-client/lib/hive-exec-3.1.0.<<<version>>>.jar /path/to/flink-1.10.1/lib

意味着我更进一步:

val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
btenv.registerCatalog(name, hive)
Caused by: java.lang.ClassNotFoundException: com.facebook.fb303.FacebookService$Iface

https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/添加到lib目录后

btenv.registerCatalog(name, hive)

不会抱怨未找到类异常,但执行似乎在此步骤停留了几分钟。然后,它因 kerberos 异常而失败:

Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: GSS initiate failed

我才意识到:

<property>
      <name>hive.metastore.kerberos.principal</name>
      <value>hive/_HOST@xxxx</value>
</property>

klist
Default principal: user@xxxx

这里的主体与 hive-site.xml 中的主体不匹配。但是,spark 可以使用此处概述的相同配置和主体不匹配很好地读取元存储。

标签: hiveapache-flinkflink-streaminghdp

解决方案


推荐阅读