一、梳理定位
经过之前的源码准备
为了减少理解上的差异,直接在官网找到架构图进行分析。
从上图可以直观的看出
Hive的组件主要有
UI: 用户交互界面,用于提交SQL文件任务,观察任务进度占用等
DRIVER: 见名知意,用于接收SQL语句,是客户提交任务的第一接收者,是Hive执行的发起者,也是执行结果的反馈者,并且提供JDBC/ODBC接口为模型的执行和API的获取
COMPILER: 同样见名知意,解析CLI提交的语句,根据不同的查询做语义分析,并从metastore中获取表和元数据生成执行计划
EXECUTION ENGINE: 创建执行计划的组件,基于DAG的阶段执行。执行引擎管理不同阶段的依赖关系,并在各自适应的组件上执行这些计划
METASTORE: 存储表和分区的所有信息,包扩列和列类型的组件。同时包含序列化和反序列化的能力用于生成相应的HDFS文件
SQL提交到Driver端后,会做词法和语法的解析验证的同时做语义的分析(获取元数据信息),通过对元数据的解析得到逻辑计划并进行优化。解析完毕逻辑计划的同时生成物理计划(体现在执行算子上),至此任务就交给底层引擎执行了
本质上,Hive工作的过程就是将SQL语句解析成引擎执行的过程
但上图概括度过大,没有表现出具体每个组件的工作职责,通过查阅各种架构图整理出一套直观的流程图
1.开始HQL被提交给客户端
2.解析器对HQL进行语法解析生成AST
3.遍历AST生成QeuryBlock
4.Logic Plan Generator将QB转换成逻辑执行计划
5.Logic Optimizer优化逻辑执行计划
6.Physical Plan Gernerator将逻辑计划转化成物理计划
7.Physical Optimizer将生成的物理计划进行优化
8.最终将完整的计划交由引擎去执行
二、上手
Hive任务提交入口,位于Driver类中,该类掌握Hive任务从起始到结束的过程
开始找到run()方法,逐步找到runInternal()、compileInternal(),再到compile(),在该方法中看到了AST树的创建,传值来源于ParseUtils.parse()
public CommandProcessorResponse run(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
//
CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
}
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
errorMessage = null;
SQLState = null;
downstreamError = null;
lDrvState.stateLock.lock();
try {
//判断是否编译
if (alreadyCompiled) {
if (lDrvState.driverState == DriverState.COMPILED) {
lDrvState.driverState = DriverState.EXECUTING;
} else {
errorMessage = "FAILED: Precompiled query has been cancelled or closed.";
console.printError(errorMessage);
return createProcessorResponse(12);
}
} else {
lDrvState.driverState = DriverState.COMPILING;
}
} finally {
lDrvState.stateLock.unlock();
}
// a flag that helps to set the correct driver state in finally block by tracking if
// the method has been returned by an error or not.
boolean isFinishedWithError = true;
try {
HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf,
alreadyCompiled ? ctx.getCmd() : command);
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try {
driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
HiveDriverRunHook.class);
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.preDriverRun(hookContext);
}
} catch (Exception e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return createProcessorResponse(12);
}
PerfLogger perfLogger = null;
int ret;
//判断是否编译,若没编译就执行
if (!alreadyCompiled) {
// compile internal will automatically reset the perf logger
//调用compileInternal()
ret = compileInternal(command, true);
// then we continue to use this perf logger
perfLogger = SessionState.getPerfLogger();
if (ret != 0) {
return createProcessorResponse(ret);
}
} else {
// reuse existing perf logger.
perfLogger = SessionState.getPerfLogger();
// Since we're reusing the compiled plan, we need to update its start time for current run
plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
}
private int compileInternal(String command, boolean deferClose) {
int ret;
Metrics metrics = MetricsFactory.getInstance();
if (metrics != null) {
metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
}
final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled,
command);
if (compileLock == null) {
return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
}
try {
if (metrics != null) {
metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
}
//调用compile()
ret = compile(command, true, deferClose);
} finally {
compileLock.unlock();
}
public int compile(String command, boolean resetTaskIds, boolean deferClose) {
PerfLogger perfLogger = SessionState.getPerfLogger(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
lDrvState.stateLock.lock();
try {
lDrvState.driverState = DriverState.COMPILING;
} finally {
lDrvState.stateLock.unlock();
}
command = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(conf, command);
String queryStr = command;
try {
// command should be redacted to avoid to logging sensitive data
queryStr = HookUtils.redactLogString(conf, command);
} catch (Exception e) {
LOG.warn("WARNING! Query command could not be redacted." + e);
}
if (isInterrupted()) {
return handleInterruption("at beginning of compilation."); //indicate if need clean resource
}
if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
// close the existing ctx etc before compiling a new query, but does not destroy driver
closeInProcess(false);
}
if (resetTaskIds) {
TaskFactory.resetId();
}
String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
//save some info for webUI for use after plan is freed
this.queryDisplay.setQueryStr(queryStr);
this.queryDisplay.setQueryId(queryId);
LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);
SessionState.get().setupQueryCurrentTimestamp();
// Whether any error occurred during query compilation. Used for query lifetime hook.
boolean compileError = false;
try {
// Initialize the transaction manager. This must be done before analyze is called.
final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf);
// In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks
// if compile is being called multiple times, clear the old shutdownhook
ShutdownHookManager.removeShutdownHook(shutdownRunner);
shutdownRunner = new Runnable() {
@Override
public void run() {
try {
releaseLocksAndCommitOrRollback(false, txnManager);
} catch (LockException e) {
LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " +
e.getMessage());
}
}
};
ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
if (isInterrupted()) {
return handleInterruption("before parsing and analysing the query");
}
if (ctx == null) {
ctx = new Context(conf);
}
ctx.setTryCount(getTryCount());
ctx.setCmd(command);
ctx.setHDFSCleanup(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
//从AST中解析语法词法
ASTNode tree = ParseUtils.parse(command, ctx);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
深入到ASTNode parse()方法可以看到核心是pd.parse(),在词法和语法的多重解析,且判断语法没有错误后,最终获取成型的AST树。
public static ASTNode parse(
String command, Context ctx, String viewFullyQualifiedName) throws ParseException {
ParseDriver pd = new ParseDriver();
//获取AST信息
ASTNode tree = pd.parse(command, ctx, viewFullyQualifiedName);
tree = findRootNonNullToken(tree);
handleSetColRefs(tree);
return tree;
}
public ASTNode parse(String command, Context ctx, String viewFullyQualifiedName)
throws ParseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Parsing command: " + command);
}
//做词法解析
HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));
TokenRewriteStream tokens = new TokenRewriteStream(lexer);
if (ctx != null) {
if (viewFullyQualifiedName == null) {
// Top level query
ctx.setTokenRewriteStream(tokens);
} else {
// It is a view
ctx.addViewTokenRewriteStream(viewFullyQualifiedName, tokens);
}
lexer.setHiveConf(ctx.getConf());
}
HiveParser parser = new HiveParser(tokens);
if (ctx != null) {
parser.setHiveConf(ctx.getConf());
}
parser.setTreeAdaptor(adaptor);
HiveParser.statement_return r = null;
try {
r = parser.statement();
} catch (RecognitionException e) {
e.printStackTrace();
throw new ParseException(parser.errors);
}
if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {
LOG.debug("Parse Completed");
} else if (lexer.getErrors().size() != 0) {
throw new ParseException(lexer.getErrors());
} else {
throw new ParseException(parser.errors);
}
//获取成型的AST
ASTNode tree = (ASTNode) r.getTree();
tree.setUnknownTokenBoundaries();
return tree;
}
得到AST后,Hive需要进一步将其细化为最小查询单元QB。在SemanticAnalyzer中找到doPhase1QBExpr()方法,在其中遍历AST达到获取QB信息的目的,在这之前Hive或根据AST根节点的类型来选择对应的解析方法。
public void doPhase1QBExpr(ASTNode ast, QBExpr qbexpr, String id, String alias, boolean insideView)
throws SemanticException {
assert (ast.getToken() != null);
if (ast.getToken().getType() == HiveParser.TOK_QUERY) {
//创建QB
QB qb = new QB(id, alias, true);
qb.setInsideView(insideView);
Phase1Ctx ctx_1 = initPhase1Ctx();
doPhase1(ast, qb, ctx_1, null);
qbexpr.setOpcode(QBExpr.Opcode.NULLOP);
qbexpr.setQB(qb);
}
// setop
else {
switch (ast.getToken().getType()) {
case HiveParser.TOK_UNIONALL:
qbexpr.setOpcode(QBExpr.Opcode.UNION);
break;
case HiveParser.TOK_INTERSECTALL:
qbexpr.setOpcode(QBExpr.Opcode.INTERSECTALL);
break;
case HiveParser.TOK_INTERSECTDISTINCT:
qbexpr.setOpcode(QBExpr.Opcode.INTERSECT);
break;
case HiveParser.TOK_EXCEPTALL:
qbexpr.setOpcode(QBExpr.Opcode.EXCEPTALL);
break;
case HiveParser.TOK_EXCEPTDISTINCT:
qbexpr.setOpcode(QBExpr.Opcode.EXCEPT);
break;
default:
throw new SemanticException(ErrorMsg.UNSUPPORTED_SET_OPERATOR.getMsg("Type "
+ ast.getToken().getType()));
}
//两次遍历AST填充QB,这里使用的是深度遍历,只要找到根节点便可以遍历整个AST
// query 1
assert (ast.getChild(0) != null);
QBExpr qbexpr1 = new QBExpr(alias + SUBQUERY_TAG_1);
doPhase1QBExpr((ASTNode) ast.getChild(0), qbexpr1, id + SUBQUERY_TAG_1, alias
+ SUBQUERY_TAG_1, insideView);
qbexpr.setQBExpr1(qbexpr1);
// query 2
assert (ast.getChild(1) != null);
QBExpr qbexpr2 = new QBExpr(alias + SUBQUERY_TAG_2);
doPhase1QBExpr((ASTNode) ast.getChild(1), qbexpr2, id + SUBQUERY_TAG_2, alias
+ SUBQUERY_TAG_2, insideView);
qbexpr.setQBExpr2(qbexpr2);
}
}
具体的解析过程在analyzeInternal()可以找到,源码给出的注释直观且有序。
void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticException {
//从抽象语法树开始分析
// 1. Generate Resolved Parse tree from syntax tree
LOG.info("Starting Semantic Analysis");
//change the location of position alias process here
processPositionAlias(ast);
if (!genResolvedParseTree(ast, plannerCtx)) {
return;
}
//2.生成逻辑执行计划
// 2. Gen OP Tree from resolved Parse Tree
Operator sinkOp = genOPTree(ast, plannerCtx);
if (!unparseTranslator.isEnabled() && tableMask.isEnabled()) {
// Here we rewrite the * and also the masking table
ASTNode tree = rewriteASTWithMaskAndFilter(tableMask, ast, ctx.getTokenRewriteStream(),
ctx, db, tabNameToTabObject, ignoredTokens);
if (tree != ast) {
ctx.setSkipTableMasking(true);
init(true);
//change the location of position alias process here
processPositionAlias(tree);
genResolvedParseTree(tree, plannerCtx);
if (this instanceof CalcitePlanner) {
((CalcitePlanner) this).resetCalciteConfiguration();
}
sinkOp = genOPTree(tree, plannerCtx);
}
}
对解析上下文进行优化
public ParseContext optimize() throws SemanticException {
for (Transform t : transformations) {
t.beginPerfLogging();
pctx = t.transform(pctx);
t.endPerfLogging(t.toString());
}
return pctx;
}
public PhysicalContext optimize() throws SemanticException {
for (PhysicalPlanResolver r : resolvers) {
pctx = r.resolve(pctx);
}
return pctx;
}
task执行是按照每个executeTask()来做的,最终被封装到进线程类做到并发
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}